Tổng hợp nhiều danh sách python
Trong bài đăng này, chúng tôi sẽ giải thích một số phương pháp và kỹ thuật có thể được sử dụng để lấy tổng của hai phần tử danh sách trong danh sách cuối cùng Show
Danh sách Python là tập hợp các phần tử có thể lặp lại có thể chứa bất kỳ loại mục nào trong đó. Ở đây, chúng ta sẽ tính tổng của hai hoặc nhiều phần tử Danh sách theo chỉ mục và tạo một danh sách mới từ tổng. Giả sử chúng ta có hai danh sách chứa nhiều mục kiểu số Tạo hai danh sách Python
Sử dụng hai danh sách trên, chúng tôi muốn tạo danh sách đầu ra sẽ giống như bên dưới
Dưới đây là các phương pháp có thể được sử dụng để lấy danh sách mới sẽ chứa tổng các phần tử với chỉ số tương ứng của chúng Sử dụng zip () và Hiểu danh sách để lấy tổng của hai danh sáchĐây là một cách Pythonic để tạo một danh sách mới sẽ chứa tổng các mục. Chúng tôi sẽ sử dụng chức năng zip() và Danh sách hiểu cho điều đó Mã ví dụ
đầu ra
Nếu bạn có một danh sách các danh sách và muốn lấy danh sách chứa tổng của hai danh sách thì bạn có thể sử dụng ví dụ mã bên dưới
đầu ra
Phương pháp 2. Sử dụng các hàm map() và lambdaChúng ta cũng có thể sử dụng các hàm map() và lambda để tạo một danh sách chứa tổng của hai danh sách. Chúng tôi đã tạo hai danh sách list1 và list2 tại đây và tạo kết quả danh sách mới bằng cách sử dụng các hàm map và lambda
Ví dụ về mã đầy đủ
đầu ra
Phương pháp 3. Sử dụng hàm add() numpyNếu bạn đang sử dụng numty trong dự án của mình thì bạn có thể sử dụng hàm add() của nó để tạo tổng của hai danh sách. Dưới đây là ví dụ mã Hướng dẫn lập trình Beam dành cho người dùng Beam muốn sử dụng Beam SDK để tạo quy trình xử lý dữ liệu. Nó cung cấp hướng dẫn sử dụng các lớp Beam SDK để xây dựng và kiểm tra quy trình của bạn. Hướng dẫn lập trình không nhằm mục đích tham khảo toàn diện, mà là hướng dẫn cấp cao, không phụ thuộc vào ngôn ngữ để lập trình quy trình Beam của bạn. Khi hướng dẫn lập trình được điền vào, văn bản sẽ bao gồm các mẫu mã bằng nhiều ngôn ngữ để giúp minh họa cách triển khai các khái niệm Beam trong quy trình của bạn Nếu bạn muốn giới thiệu ngắn gọn về các khái niệm cơ bản của Beam trước khi đọc hướng dẫn lập trình, hãy xem trang Khái niệm cơ bản về mô hình Beam Thích ứng cho
SDK Python hỗ trợ Python 3. 7, 3. 8, 3. 9 và 3. 10 Go SDK hỗ trợ Go v1. 18+. Bản phát hành SDK 2. 32. 0 là phiên bản thử nghiệm cuối cùng Typescript SDK hỗ trợ Node v16+ và vẫn đang thử nghiệm 1. Tổng quanĐể sử dụng Beam, trước tiên bạn cần tạo một chương trình trình điều khiển bằng cách sử dụng các lớp trong một trong các Beam SDK. Chương trình trình điều khiển của bạn xác định đường dẫn của bạn, bao gồm tất cả đầu vào, biến đổi và đầu ra; . Chúng bao gồm Trình chạy đường ống, đến lượt nó, xác định đường ống của bạn sẽ chạy trên back-end nào Beam SDK cung cấp một số khái niệm trừu tượng giúp đơn giản hóa cơ chế xử lý dữ liệu phân tán quy mô lớn. Các bản tóm tắt Beam giống nhau hoạt động với cả nguồn dữ liệu hàng loạt và luồng dữ liệu. Khi bạn tạo quy trình Beam, bạn có thể nghĩ về tác vụ xử lý dữ liệu của mình theo các khái niệm trừu tượng này. Chúng bao gồm
Một chương trình điều khiển Beam điển hình hoạt động như sau
Khi bạn chạy chương trình trình điều khiển Beam, Trình chạy đường ống mà bạn chỉ định sẽ xây dựng biểu đồ quy trình công việc của đường ống của bạn dựa trên 73 đối tượng bạn đã tạo và các biến đổi mà bạn đã áp dụng. Biểu đồ đó sau đó được thực thi bằng cách sử dụng back-end xử lý phân tán thích hợp, trở thành một “công việc” không đồng bộ (hoặc tương đương) trên back-end đó2. Tạo một đường ống dẫnPhần trừu tượng 68 gói gọn tất cả dữ liệu và các bước trong tác vụ xử lý dữ liệu của bạn. Chương trình trình điều khiển Beam của bạn thường bắt đầu bằng cách xây dựng một đối tượng Pipeline Pipeline , sau đó sử dụng đối tượng đó làm cơ sở để tạo . Để sử dụng Beam, trước tiên chương trình trình điều khiển của bạn phải tạo một phiên bản của lớp Beam SDK 68 (thường là trong hàm 09). Khi bạn tạo 68, bạn cũng sẽ cần đặt một số tùy chọn cấu hình. Bạn có thể đặt các tùy chọn cấu hình cho đường dẫn của mình theo chương trình, nhưng việc đặt các tùy chọn trước (hoặc đọc chúng từ dòng lệnh) và chuyển chúng đến đối tượng 68 khi bạn tạo đối tượng thường dễ dàng hơnĐường ống trong TypeScript API chỉ đơn giản là một chức năng sẽ được gọi với một đối tượng `root` duy nhất và được truyền cho phương thức `run` của Người chạy
2. 1. Định cấu hình tùy chọn đường ốngSử dụng các tùy chọn đường ống để định cấu hình các khía cạnh khác nhau của đường ống của bạn, chẳng hạn như trình chạy đường ống sẽ thực thi đường ống của bạn và bất kỳ cấu hình cụ thể nào của trình chạy theo yêu cầu của trình chạy đã chọn. Các tùy chọn quy trình của bạn có khả năng sẽ bao gồm thông tin như ID dự án của bạn hoặc vị trí lưu trữ tệp Khi bạn chạy quy trình trên một trình chạy mà bạn chọn, một bản sao của PipelineOptions sẽ có sẵn cho mã của bạn. Ví dụ: nếu bạn thêm tham số PipelineOptions vào phương thức 12 của DoFn, tham số đó sẽ được hệ thống điền vào2. 1. 1. Đặt PipelineOptions từ đối số dòng lệnhMặc dù bạn có thể định cấu hình quy trình của mình bằng cách tạo một đối tượng 13 và đặt trường trực tiếp, SDK Beam bao gồm trình phân tích cú pháp dòng lệnh mà bạn có thể sử dụng để đặt trường trong 13 bằng cách sử dụng đối số dòng lệnhĐể đọc các tùy chọn từ dòng lệnh, hãy xây dựng đối tượng 13 của bạn như được minh họa trong mã ví dụ sauSử dụng cờ Go để phân tích các đối số dòng lệnh để định cấu hình đường dẫn của bạn. Cờ phải được phân tích cú pháp trước khi 16 được gọiBất kỳ đối tượng Javascript nào cũng có thể được sử dụng làm tùy chọn đường dẫn. Người ta có thể xây dựng một cách thủ công, nhưng cũng thường truyền một đối tượng được tạo từ các tùy chọn dòng lệnh, chẳng hạn như 17
Điều này diễn giải các đối số dòng lệnh tuân theo định dạng
Xây dựng 13 của bạn theo cách này cho phép bạn chỉ định bất kỳ tùy chọn nào làm đối số dòng lệnhXác định các biến cờ theo cách này cho phép bạn chỉ định bất kỳ tùy chọn nào làm đối số dòng lệnh
2. 1. 2. Tạo tùy chọn tùy chỉnhBạn có thể thêm các tùy chọn tùy chỉnh của riêng mình ngoài 13 tiêu chuẩnĐể thêm các tùy chọn của riêng bạn, hãy xác định giao diện với các phương thức getter và setter cho từng tùy chọn Ví dụ sau đây cho thấy cách thêm tùy chọn tùy chỉnh 21 và 22
0 1 2Bạn cũng có thể chỉ định mô tả, mô tả này sẽ xuất hiện khi người dùng chuyển 23 làm đối số dòng lệnh và giá trị mặc địnhBạn đặt mô tả và giá trị mặc định bằng chú thích, như sau 3 4 5Đối với Python, bạn cũng có thể chỉ cần phân tích các tùy chọn tùy chỉnh của mình bằng argparse; Bạn nên đăng ký giao diện của mình với 24 và sau đó chuyển giao diện khi tạo đối tượng 13. Khi bạn đăng ký giao diện của mình với 24, thì 23 có thể tìm thấy giao diện tùy chọn tùy chỉnh của bạn và thêm nó vào đầu ra của lệnh 23. 24 cũng sẽ xác thực rằng các tùy chọn tùy chỉnh của bạn tương thích với tất cả các tùy chọn đã đăng ký khácMã ví dụ sau đây cho biết cách đăng ký giao diện tùy chọn tùy chỉnh của bạn với 24 6Bây giờ quy trình của bạn có thể chấp nhận 31 và 32 làm đối số dòng lệnh3. bộ sưu tậpPhần trừu tượng PCollection ______9_______73 đại diện cho một tập dữ liệu đa phần tử có khả năng phân phối. Bạn có thể coi 73 là dữ liệu “đường ống”; . Như vậy, nếu bạn muốn làm việc với dữ liệu trong quy trình của mình, dữ liệu đó phải ở dạng 73. Sau khi bạn đã tạo 68 của mình, bạn sẽ cần bắt đầu bằng cách tạo ít nhất một 73 ở một dạng nào đó. 73 bạn tạo đóng vai trò là đầu vào cho hoạt động đầu tiên trong quy trình của bạn3. 1. Tạo một PCCollectionBạn tạo một 73 bằng cách đọc dữ liệu từ một nguồn bên ngoài bằng cách sử dụng Beam, hoặc bạn có thể tạo một 73 dữ liệu được lưu trữ trong một lớp bộ sưu tập trong bộ nhớ trong chương trình trình điều khiển của bạn. Cái trước thường là cách một đường ống sản xuất sẽ nhập dữ liệu; . Cái sau chủ yếu hữu ích cho mục đích thử nghiệm và gỡ lỗi3. 1. 1. Đọc từ một nguồn bên ngoàiĐể đọc từ một nguồn bên ngoài, bạn sử dụng một trong các. Các bộ điều hợp khác nhau về cách sử dụng chính xác của chúng, nhưng tất cả chúng đều đọc từ một số nguồn dữ liệu bên ngoài và trả về một 73 có các thành phần đại diện cho các bản ghi dữ liệu trong nguồn đóMỗi bộ điều hợp nguồn dữ liệu có một biến đổi 43; . ______48_______45 ______48_______46 47 48 , . Đây là cách bạn sẽ đăng ký 45 46 47 48 to your 68 root to create a 73: 7 8 9 0Xem để tìm hiểu thêm về cách đọc từ các nguồn dữ liệu khác nhau được Beam SDK hỗ trợ 3. 1. 2. Tạo PCollection từ dữ liệu trong bộ nhớĐể tạo một 73 từ một Java 59 trong bộ nhớ, bạn sử dụng biến đổi 94 do Beam cung cấp. Giống như 43 của bộ điều hợp dữ liệu, bạn áp dụng trực tiếp 94 cho chính đối tượng 68 của mìnhLà tham số, 94 chấp nhận đối tượng Java 59 và một đối tượng 66. 66 chỉ định cách các phần tử trong 59 nênĐể tạo một 73 từ một 70 trong bộ nhớ, bạn sử dụng biến đổi 94 do Beam cung cấp. Áp dụng biến đổi này trực tiếp cho chính đối tượng 68 của bạnĐể tạo một 73 từ một 74 trong bộ nhớ, bạn sử dụng biến đổi 75 do Beam cung cấp. Chuyển đường dẫn 76 và 74 tới biến đổi nàyĐể tạo một 73 từ một 79 trong bộ nhớ, bạn sử dụng biến đổi 94 do Beam cung cấp. Áp dụng biến đổi này trực tiếp cho đối tượng 81 của bạnMã ví dụ sau cho biết cách tạo 73 từ bộ nhớ trong 83 70 74 . 79: 1 2 3 43. 2. Đặc tính bộ sưu tậpA 73 được sở hữu bởi đối tượng cụ thể 68 mà nó được tạo ra; . Ở một số khía cạnh, một lớp 73 giống như một lớp 59. Tuy nhiên, một 73 có thể khác nhau theo một số cách chính. 3. 2. 1. Loại nguyên tốCác phần tử của một 73 có thể thuộc bất kỳ loại nào, nhưng tất cả phải cùng loại. Tuy nhiên, để hỗ trợ xử lý phân tán, Beam cần có khả năng mã hóa từng phần tử riêng lẻ dưới dạng chuỗi byte (để các phần tử có thể được chuyển cho các công nhân phân tán). Beam SDK cung cấp cơ chế mã hóa dữ liệu bao gồm mã hóa tích hợp cho các loại thường được sử dụng cũng như hỗ trợ chỉ định mã hóa tùy chỉnh khi cần3. 2. 2. lược đồ phần tửTrong nhiều trường hợp, loại phần tử trong 73 có cấu trúc có thể xem xét nội tâm. Ví dụ là các bản ghi JSON, Bộ đệm giao thức, Avro và cơ sở dữ liệu. Các lược đồ cung cấp một cách để thể hiện các loại dưới dạng một tập hợp các trường được đặt tên, cho phép các tập hợp biểu cảm hơn3. 2. 3. tính bất biếnMột 73 là bất biến. Sau khi tạo, bạn không thể thêm, xóa hoặc thay đổi các thành phần riêng lẻ. Biến đổi Beam có thể xử lý từng phần tử của 73 và tạo dữ liệu đường ống mới (dưới dạng 73 mới), nhưng nó không sử dụng hoặc sửa đổi bộ sưu tập đầu vào ban đầu
3. 2. 4. Truy cập ngẫu nhiênMột 73 không hỗ trợ truy cập ngẫu nhiên vào các phần tử riêng lẻ. Thay vào đó, Beam Transforms xem xét từng phần tử trong một 73 riêng lẻ3. 2. 5. Kích thước và giới hạnMột 73 là một “túi” phần tử lớn, bất biến. Không có giới hạn trên về số lượng phần tử mà một 73 có thể chứa; Một 73 có thể có kích thước giới hạn hoặc không giới hạn. Một 73 có giới hạn biểu thị một tập dữ liệu có kích thước cố định, đã biết, trong khi một 73 không giới hạn biểu thị một tập dữ liệu có kích thước không giới hạn. Việc một 73 có giới hạn hay không có giới hạn phụ thuộc vào nguồn của tập dữ liệu mà nó đại diện. Việc đọc từ nguồn dữ liệu lô, chẳng hạn như tệp hoặc cơ sở dữ liệu, sẽ tạo ra một 73 có giới hạn. Việc đọc từ nguồn dữ liệu phát trực tuyến hoặc cập nhật liên tục, chẳng hạn như Pub/Sub hoặc Kafka, sẽ tạo ra một 73 không giới hạn (trừ khi bạn yêu cầu rõ ràng là không làm như vậy)Bản chất bị chặn (hoặc không bị chặn) của 73 của bạn ảnh hưởng đến cách Beam xử lý dữ liệu của bạn. Có thể xử lý một 73 có giới hạn bằng cách sử dụng một công việc hàng loạt, công việc này có thể đọc toàn bộ tập dữ liệu một lần và thực hiện xử lý trong một công việc có độ dài hữu hạn. Một 73 không giới hạn phải được xử lý bằng cách sử dụng công việc phát trực tuyến chạy liên tục, vì toàn bộ bộ sưu tập không bao giờ có sẵn để xử lý cùng một lúcBeam sử dụng để phân chia một 73 cập nhật liên tục thành các cửa sổ logic có kích thước hữu hạn. Các cửa sổ logic này được xác định bởi một số đặc điểm được liên kết với một phần tử dữ liệu, chẳng hạn như dấu thời gian. Các phép biến đổi tổng hợp (chẳng hạn như 14 và 15) hoạt động trên cơ sở mỗi cửa sổ — khi tập dữ liệu được tạo, chúng xử lý từng 73 dưới dạng chuỗi liên tiếp của các cửa sổ hữu hạn này3. 2. 6. dấu thời gian phần tửEach element in a 73 has an associated intrinsic timestamp. The timestamp for each element is initially assigned by the that creates the 73. Sources that create an unbounded 73 often assign each new element a timestamp that corresponds to when the element was read or added
Timestamps are useful for a 73 that contains elements with an inherent notion of time. If your pipeline is reading a stream of events, like Tweets or other social media messages, each element might use the time the event was posted as the element timestampYou can manually assign timestamps to the elements of a 73 if the source doesn’t do it for you. You’ll want to do this if the elements have an inherent timestamp, but the timestamp is somewhere in the structure of the element itself (such as a “time” field in a server log entry). Beam has that take a 73 as input and output an identical 73 with timestamps attached; see for more information about how to do so4. TransformsTransforms are the operations in your pipeline, and provide a generic processing framework. You provide processing logic in the form of a function object (colloquially referred to as “user code”), and your user code is applied to each element of an input 73 (or more than one 73). Depending on the pipeline runner and back-end that you choose, many different workers across a cluster may execute instances of your user code in parallel. The user code running on each worker generates the output elements that are ultimately added to the final output 73 that the transform produces
The Beam SDKs contain a number of different transforms that you can apply to your pipeline’s 73s. These include general-purpose core transforms, such as or . There are also pre-written included in the SDKs, which combine one or more of the core transforms in a useful processing pattern, such as counting or combining elements in a collection. You can also define your own more complex composite transforms to fit your pipeline’s exact use case4. 1. Applying transformsTo invoke a transform, you must apply it to the input 73. Each transform in the Beam SDKs has a generic 31 method (or pipe operator 32) . Invoking multiple Beam transforms is similar to method chaining, but with one slight difference. You apply the transform to the input 73, passing the transform itself as an argument, and the operation returns the output 73. This takes the general form. 5 6 7 8Bởi vì Beam sử dụng một phương pháp chung 31 cho 73, nên bạn có thể thực hiện cả hai phép biến đổi chuỗi một cách tuần tự và cũng có thể áp dụng các phép biến đổi có chứa các biến đổi khác được lồng vào bên trong (được gọi trong Beam SDK)It’s recommended to create a new variable for each new 73 to sequentially transform input data. 84s can be used to create functions that contain other transforms (called in the Beam SDKs)How you apply your pipeline’s transforms determines the structure of your pipeline. The best way to think of your pipeline is as a directed acyclic graph, where 78 nodes are subroutines that accept 73 nodes as inputs and emit 73 nodes as outputs. For example, you can chain together transforms to create a pipeline that successively modifies input data. For example, you can successively call transforms on PCollections to modify the input data. 9 0 1 2The graph of this pipeline looks like the following Figure 1. A linear pipeline with three sequential transforms However, note that a transform does not consume or otherwise alter the input collection — remember that a 73 is immutable by definition. This means that you can apply multiple transforms to the same input 73 to create a branching pipeline, like so 3 4 5 6The graph of this branching pipeline looks like the following Figure 2. A branching pipeline. Two transforms are applied to a single PCollection of database table rows You can also build your own that nest multiple transforms inside a single, larger transform. Composite transforms are particularly useful for building a reusable sequence of simple steps that get used in a lot of different places The pipe syntax allows one to apply PTransforms to 44s and 45s of PCollections as well for those transforms accepting multiple inputs (such as 46 and 47)PTransforms can also be applied to any 48, which include the Root object, PCollections, arrays of 48s, and objects with 48 values. One can apply transforms to these composite types by wrapping them with 51, e. g. 52PTransforms come in two flavors, synchronous and asynchronous, depending on whether their application* involves asynchronous invocations. An 53 must be applied with 54 and returns a 55 which must be awaited before further pipeline construction4. 2. Core Beam transformsBeam provides the following core transforms, each of which represents a different processing paradigm
The Typescript SDK provides some of the most basic of these transforms as methods on 73 itself4. 2. 1. ParDo 56 is a Beam transform for generic parallel processing. The 56 processing paradigm is similar to the “Map” phase of a Map/Shuffle/Reduce-style algorithm. a 56 transform considers each element in the input 73, performs some processing function (your user code) on that element, and emits zero, one, or multiple elements to an output 73 56 is useful for a variety of common data processing operations, including
In such roles, 56 is a common intermediate step in a pipeline. You might use it to extract certain fields from a set of raw input records, or convert raw input into a different format; you might also use 56 to convert processed data into a format suitable for output, like database table rows or printable stringsWhen you apply a 56 transform, you’ll need to provide user code in the form of a 83 object. 83 is a Beam SDK class that defines a distributed processing function
All DoFns should be registered using a generic 86 function. This allows the Go SDK to infer an encoding from any inputs/outputs, registers the DoFn for execution on remote runners, and optimizes the runtime execution of the DoFns via reflection 74. 2. 1. 1. Applying ParDoLike all Beam transforms, you apply 56 by calling the 31 method on the input 73 and passing 56 as an argument, as shown in the following example code 91 applies the passed in 83 argument to the input 73, as shown in the following example code 8 9 0 1In the example, our input 73 contains 50 96 values. We apply a 56 transform that specifies a function ( 98) to compute the length of each string, and outputs the result to a new 73 of 00 01 values that stores the length of each word. 4. 2. 1. 2. Creating a DoFnĐối tượng 83 mà bạn chuyển đến 56 chứa logic xử lý được áp dụng cho các phần tử trong bộ sưu tập đầu vào. When you use Beam, often the most important pieces of code you’ll write are these 83s - they’re what define your pipeline’s exact data processing tasks
A 83 processes one element at a time from the input 73. Khi bạn tạo một lớp con của 83, bạn sẽ cần cung cấp các tham số loại phù hợp với loại của các phần tử đầu vào và đầu ra. If your 83 processes incoming 50 elements and produces 00 elements for the output collection (like our previous example, 98), your class declaration would look like thisA 83 processes one element at a time from the input 73. When you create a 83 struct, you’ll need to provide type parameters that match the types of the input and output elements in a ProcessElement method. Nếu 83 của bạn xử lý các phần tử 96 đến và tạo ra các phần tử 01 cho tập hợp đầu ra (như ví dụ trước của chúng tôi, 98), dofn của bạn có thể trông như thế này 2 3Inside your 83 subclass, you’ll write a method annotated with 12 where you provide the actual processing logic. You don’t need to manually extract the elements from the input collection; the Beam SDKs handle that for you. Your 12 method should accept a parameter tagged with 23, which will be populated with the input element. In order to output elements, the method can also take a parameter of type 24 which provides a method for emitting elements. The parameter types must match the input and output types of your 83 or the framework will raise an error. Ghi chú. 23 và 24 đã được giới thiệu trong Beam 2. 5. 0; Inside your 83 subclass, you’ll write a method 30 where you provide the actual processing logic. You don’t need to manually extract the elements from the input collection; the Beam SDKs handle that for you. Your 30 method should accept an argument 32, which is the input element, and return an iterable with its output values. You can accomplish this by emitting individual elements with 33 statements. You can also use a 34 statement with an iterable, like a list or a generatorFor your 83 type, you’ll write a method 36 where you provide the actual processing logic. You don’t need to manually extract the elements from the input collection; the Beam SDKs handle that for you. Your 36 method should accept a parameter 32, which is the input element. In order to output elements, the method can also take a function parameter, which can be called to emit elements. The parameter types must match the input and output types of your 83 or the framework will raise an error 4 5 6 7Simple DoFns can also be written as functions 8
A given 83 instance generally gets invoked one or more times to process some arbitrary bundle of elements. However, Beam doesn’t guarantee an exact number of invocations; it may be invoked multiple times on a given worker node to account for failures and retries. As such, you can cache information across multiple calls to your processing method, but if you do so, make sure the implementation does not depend on the number of invocationsIn your processing method, you’ll also need to meet some immutability requirements to ensure that Beam and the processing back-end can safely serialize and cache the values in your pipeline. Your method should meet the following requirements
4. 2. 1. 3. Lightweight DoFns and other abstractionsIf your function is relatively straightforward, you can simplify your use of 56 by providing a lightweight 83 in-line, as an anonymous inner class instance a lambda function an anonymous function a function passed to 60 or 61 . Here’s the previous example, 56 with 63, with the 83 specified as an anonymous inner class instance a lambda function an anonymous function a function . 9 0 1 2If your 56 performs a one-to-one mapping of input elements to output elements–that is, for each input element, it applies a function that produces exactly one output element, you can return that element directly. you can use the higher-level 66 67 transform. 66 can accept an anonymous Java 8 lambda function for additional brevity. Here’s the previous example using 3 4 5 6
4. 2. 1. 4. DoFn lifecycleHere is a sequence diagram that shows the lifecycle of the DoFn during the execution of the ParDo transform. The comments give useful information to pipeline developers such as the constraints that apply to the objects or particular cases such as failover or instance reuse. They also give instantiation use cases 4. 2. 2. GroupByKey 14 is a Beam transform for processing collections of key/value pairs. It’s a parallel reduction operation, analogous to the Shuffle phase of a Map/Shuffle/Reduce-style algorithm. The input to 14 is a collection of key/value pairs that represents a multimap, where the collection contains multiple pairs that have the same key, but different values. Given such a collection, you use 14 to collect all of the values associated with each unique key 14 is a good way to aggregate data that has something in common. For example, if you have a collection that stores records of customer orders, you might want to group together all the orders from the same postal code (wherein the “key” of the key/value pair is the postal code field, and the “value” is the remainder of the record)Let’s examine the mechanics of 14 with a simple example case, where our data set consists of words from a text file and the line number on which they appear. We want to group together all the line numbers (values) that share the same word (key), letting us see all the places in the text where a particular word appearsOur input is a 73 of key/value pairs where each word is a key, and the value is a line number in the file where the word appears. Here’s a list of the key/value pairs in the input collection 7 14 gathers up all the values with the same key and outputs a new pair consisting of the unique key and a collection of all of the values that were associated with that key in the input collection. If we apply 14 to our input collection above, the output collection would look like this 8Thus, 14 represents a transform from a multimap (multiple keys to individual values) to a uni-map (unique keys to collections of values)Using While all SDKs have a 14 transform, using 87 is generally more natural. The 87 transform can be parameterized by the name(s) of properties on which to group the elements of the PCollection, or a function taking the each element as input that maps to a key on which to do grouping 9 0 1 24. 2. 2. 1 GroupByKey and unbounded PCollectionsIf you are using unbounded 73s, you must use either or an in order to perform a 14 or . This is because a bounded 14 or 47 must wait for all the data with a certain key to be collected, but with unbounded collections, the data is unlimited. Windowing and/or triggers allow grouping to operate on logical, finite bundles of data within the unbounded data streamsIf you do apply 14 or 47 to a group of unbounded 73s without setting either a non-global windowing strategy, a trigger strategy, or both for each collection, Beam generates an IllegalStateException error at pipeline construction timeWhen using 14 or 47 to group 73s that have a applied, all of the 73s you want to group must use the same windowing strategy and window sizing. For example, all of the collections you are merging must use (hypothetically) identical 5-minute fixed windows, or 4-minute sliding windows starting every 30 secondsIf your pipeline attempts to use 14 or 47 to merge 73s with incompatible windows, Beam generates an IllegalStateException error at pipeline construction time4. 2. 3. CoGroupByKey 47 performs a relational join of two or more key/value 73s that have the same key type. shows an example pipeline that uses a joinConsider using 47 if you have multiple data sets that provide information about related things. For example, let’s say you have two different files with user data. one file has names and email addresses; the other file has names and phone numbers. You can join those two data sets, using the user name as a common key and the other data as the associated values. After the join, you have one data set that contains all of the information (email addresses and phone numbers) associated with each nameIf you are using unbounded 73s, you must use either or an in order to perform a 47. See for more detailsIn the Beam SDK for Java, 47 accepts a tuple of keyed 73s ( 10) as input. For type safety, the SDK requires you to pass each 73 as part of a 12. You must declare a 13 for each input 73 in the 12 that you want to pass to 47. As output, 47 returns a 18, which groups values from all the input 73s by their common keys. Each key (all of type 20) will have a different 21, which is a map from 22 to 23. You can access a specific collection in an 21 object by using the 13 that you supplied with the initial collectionIn the Beam SDK for Python, 47 accepts a dictionary of keyed 73s as input. As output, 47 creates a single output 73 that contains one key/value tuple for each key in the input 73s. Each key’s value is a dictionary that maps each tag to an iterable of the values under they key in the corresponding 73In the Beam Go SDK, 47 accepts an arbitrary number of 73s as input. As output, 47 creates a single output 73 that groups each key with value iterator functions for each input 73. The iterator functions map to input 37 in the same order they were provided to the 47The following conceptual examples use two input collections to show the mechanics of 47The first set of data has a 40 called 41 and contains names and email addresses. The second set of data has a 40 called 43 and contains names and phone numbersThe first set of data contains names and email addresses. The second set of data contains names and phone numbers 3 4 5 6After 47, the resulting data contains all data associated with each unique key from any of the input collections 7 8 9 0The following code example joins the two 73s with 47, followed by a 56 to consume the result. Then, the code uses tags to look up and format data from each collectionVí dụ mã sau nối hai 73 với 47, theo sau là một 56 để sử dụng kết quả. The ordering of the 83 iterator parameters maps to the ordering of the 47 inputs 1 2 3 4The formatted data looks like this 5 6 7 84. 2. 4. Combine
15 has variants that work on entire 73s, and some that combine the values for each key in 73s of key/value pairs. Khi bạn áp dụng biến đổi 15, bạn phải cung cấp hàm chứa logic để kết hợp các phần tử hoặc giá trị. The combining function should be commutative and associative, as the function is not necessarily invoked exactly once on all values with a given key. Because the input data (including the value collection) may be distributed across multiple workers, the combining function might be called multiple times to perform partial combining on subsets of the value collection. The Beam SDK also provides some pre-built combine functions for common numeric combination operations such as sum, min, and maxSimple combine operations, such as sums, can usually be implemented as a simple function. More complex combination operations might require you to create a subclass of 61 that has an accumulation type distinct from the input/output type. The associativity and commutativity of a 61 allows runners to automatically apply some optimizations
4. 2. 4. 1. Simple combinations using simple functionsThe following example code shows a simple combine function. Combining is done by modifying a grouping transform with the 9 0 1 2All Combiners should be registered using a generic 66 function. This allows the Go SDK to infer an encoding from any inputs/outputs, registers the Combiner for execution on remote runners, and optimizes the runtime execution of the Combiner via reflectionCombiner1 should be used when your accumulator, input, and output are all of the same type. It can be called with 67 where 68 is the type of the input/accumulator/outputCombiner2 should be used when your accumulator, input, and output are 2 distinct types. It can be called with 69 where 70 is the type of the accumulator and 71 is the other typeCombiner3 should be used when your accumulator, input, and output are 3 distinct types. It can be called with 72 where 70 is the type of the accumulator, 71 is the type of the input, and 75 is the type of the output4. 2. 4. 2. Advanced combinations using CombineFnFor more complex combine functions, you can define a subclass of 61. You should use a 61 if the combine function requires a more sophisticated accumulator, must perform additional pre- or post-processing, might change the output type, or takes the key into account. A general combining operation consists of four operations. When you create a subclass of 61, you must provide four operations by overriding the corresponding methods.
The following example code shows how to define a 61 that computes a mean average 3 4 5 6
4. 2. 4. 3. Combining a PCollection into a single valueUse the global combine to transform all of the elements in a given 73 into a single value, represented in your pipeline as a new 73 containing one element. The following example code shows how to apply the Beam provided sum combine function to produce a single sum value for a 73 of integers 7 8 9 04. 2. 4. 4. Combine and global windowingIf your input 73 uses the default global windowing, the default behavior is to return a 73 containing one item. That item’s value comes from the accumulator in the combine function that you specified when applying 15. For example, the Beam provided sum combine function returns a zero value (the sum of an empty input), while the min combine function returns a maximal or infinite valueTo have 15 instead return an empty 73 if the input is empty, specify 89 when you apply your 15 transform, as in the following code example 1 2 3 44. 2. 4. 5. Combine and non-global windowingIf your 73 uses any non-global windowing function, Beam does not provide the default behavior. You must specify one of the following options when applying 15
If your 73 uses any non-global windowing function, the Beam Go SDK behaves the same way as with global windowing. Windows that are empty in the input 73 will likewise be empty in the output collection4. 2. 4. 6. Combining values in a keyed PCollectionAfter creating a keyed PCollection (for example, by using a 14 transform), a common pattern is to combine the collection of values associated with each key into a single, merged value. Drawing on the previous example from 14, a key-grouped 73 called 03 looks like this 5In the above 73, each element has a string key (for example, “cat”) and an iterable of integers for its value (in the first element, containing [1, 5, 9]). If our pipeline’s next processing step combines the values (rather than considering them individually), you can combine the iterable of integers to create a single, merged value to be paired with each key. This pattern of a 14 followed by merging the collection of values is equivalent to Beam’s Combine PerKey transform. The combine function you supply to Combine PerKey must be an associative reduction function or a subclass of 61. 6 7 8 94. 2. 5. Flatten
73 objects that store the same data type. 46 merges multiple 73 objects into a single logical 73. The following example shows how to apply a 46 transform to merge multiple 73 objects 00 01 02 034. 2. 5. 1. Data encoding in merged collectionsBy default, the coder for the output 73 is the same as the coder for the first 73 in the input 19. However, the input 73 objects can each use different coders, as long as they all contain the same data type in your chosen language4. 2. 5. 2. Merging windowed collectionsWhen using 46 to merge 73 objects that have a windowing strategy applied, all of the 73 objects you want to merge must use a compatible windowing strategy and window sizing. For example, all the collections you’re merging must all use (hypothetically) identical 5-minute fixed windows or 4-minute sliding windows starting every 30 secondsIf your pipeline attempts to use 46 to merge 73 objects with incompatible windows, Beam generates an 26 error when your pipeline is constructed4. 2. 6. Partition
73 objects that store the same data type. 61 splits a single 73 into a fixed number of smaller collections. Often in the Typescript SDK the 34 transform is more natural to use 61 divides the elements of a 73 according to a partitioning function that you provide. The partitioning function contains the logic that determines how to split up the elements of the input 73 into each resulting partition 73. The number of partitions must be determined at graph construction time. You can, for example, pass the number of partitions as a command-line option at runtime (which will then be used to build your pipeline graph), but you cannot determine the number of partitions in mid-pipeline (based on data calculated after your pipeline graph is constructed, for instance)The following example divides a 73 into percentile groups 04 05 06 074. 3. Requirements for writing user code for Beam transformsWhen you build user code for a Beam transform, you should keep in mind the distributed nature of execution. For example, there might be many copies of your function running on a lot of different machines in parallel, and those copies function independently, without communicating or sharing state with any of the other copies. Depending on the Pipeline Runner and processing back-end you choose for your pipeline, each copy of your user code function may be retried or run multiple times. As such, you should be cautious about including things like state dependency in your user code In general, your user code must fulfill at least these requirements
In addition, it’s recommended that you make your function object idempotent. Non-idempotent functions are supported by Beam, but require additional thought to ensure correctness when there are external side effects
4. 3. 1. SerializabilityAny function object you provide to a transform must be fully serializable. This is because a copy of the function needs to be serialized and transmitted to a remote worker in your processing cluster. The base classes for user code, such as Some other serializability factors you should keep in mind are
4. 3. 2. Thread-compatibilityYour function object should be thread-compatible. Each instance of your function object is accessed by a single thread at a time on a worker instance, unless you explicitly create your own threads. Note, however, that the Beam SDKs are not thread-safe. If you create your own threads in your user code, you must provide your own synchronization. Note that static members in your function object are not passed to worker instances and that multiple instances of your function may be accessed from different threads. 4. 3. 3. IdempotenceIt’s recommended that you make your function object idempotent–that is, that it can be repeated or retried as often as necessary without causing unintended side effects. Non-idempotent functions are supported, however the Beam model provides no guarantees as to the number of times your user code might be invoked or retried; as such, keeping your function object idempotent keeps your pipeline’s output deterministic, and your transforms’ behavior more predictable and easier to debug 4. 4. Side inputsIn addition to the main input 73, you can provide additional inputs to a 56 transform in the form of side inputs. A side input is an additional input that your 83 can access each time it processes an element in the input 73. When you specify a side input, you create a view of some other data that can be read from within the 56 transform’s 83 while processing each elementSide inputs are useful if your 56 needs to inject additional data when processing each element in the input 73, but the additional data needs to be determined at runtime (and not hard-coded). Such values might be determined by the input data, or depend on a different branch of your pipelineAll side input iterables should be registered using a generic 67 function. This optimizes runtime execution of the iterable4. 4. 1. Passing side inputs to ParDo 08 09 10 114. 4. 2. Side inputs and windowingA windowed 73 may be infinite and thus cannot be compressed into a single value (or single collection class). When you create a 96 of a windowed 73, the 96 represents a single entity per window (one singleton per window, one list per window, etc. )Beam uses the window(s) for the main input element to look up the appropriate window for the side input element. Beam projects the main input element’s window into the side input’s window set, and then uses the side input from the resulting window. If the main input and side inputs have identical windows, the projection provides the exact corresponding window. Tuy nhiên, nếu đầu vào có các cửa sổ khác nhau, Beam sẽ sử dụng phép chiếu để chọn cửa sổ đầu vào bên thích hợp nhất For example, if the main input is windowed using fixed-time windows of one minute, and the side input is windowed using fixed-time windows of one hour, Beam projects the main input window against the side input window set and selects the side input value from the appropriate hour-long side input window If the main input element exists in more than one window, then 72 gets called multiple times, once for each window. Each call to 72 projects the “current” window for the main input element, and thus might provide a different view of the side input each timeIf the side input has multiple trigger firings, Beam uses the value from the latest trigger firing. This is particularly useful if you use a side input with a single global window and specify a trigger 4. 5. Additional outputsWhile 56 always produces a main output 73 (as the return value from 31), you can also have your 56 produce any number of additional output 73s. If you choose to have multiple outputs, your 56 returns all of the output 73s (including the main output) bundled togetherWhile 91 always produces an output 73, your 83 can produce any number of additional output 37s, or even none at all. If you choose to have multiple outputs, your 83 needs to be called with the 56 function that matches the number of outputs. 87 for two output 73s, 89 for three and so on until 90. If you need more, you can use 91 which will return a 92While 56 always produces a main output 73 (as the return value from 31). If you want to have multiple outputs, emit an object with distinct properties in your 56 operation and follow this operation with a 34 to break it into multiple 73s4. 5. 1. Tags for multiple outputsThe 34 PTransform will take a PCollection of elements of the form 00 and return a object 01. The set of expected tags is passed to the operation; how multiple or unknown tags are handled can be specified by passing a non-default 02 instanceThe Go SDK doesn’t use output tags, and instead uses positional ordering for multiple output PCollections 12 13 14 154. 5. 2. Emitting to multiple outputs in your DoFnCall emitter functions as needed to produce 0 or more elements for its matching 73. The same value can be emitted with multiple emitters. As normal, do not mutate values after emitting them from any emitterAll emitters should be registered using a generic 04 function. This optimizes runtime execution of the emitterDoFns can also return a single element via the standard return. The standard return is always the first PCollection returned from beam. ParDo. Other emitters output to their own PCollections in their defined parameter order 16 17 18 194. 5. 3. Accessing additional parameters in your DoFnIn addition to the element and the 24, Beam will populate other parameters to your DoFn’s 12 method. Any combination of these parameters can be added to your process method in any orderIn addition to the element, Beam will populate other parameters to your DoFn’s 30 method. Any combination of these parameters can be added to your process method in any orderIn addition to the element, Beam will populate other parameters to your DoFn’s 30 method. These are available by placing accessors in the context argument, just as for side inputsIn addition to the element, Beam will populate other parameters to your DoFn’s 36 method. Any combination of these parameters can be added to your process method in a standard ordercontext. Context. To support consolidated logging and user defined metrics, a 10 parameter can be requested. Per Go conventions, if present it’s required to be the first parameter of the 83 method 20Timestamp. To access the timestamp of an input element, add a parameter annotated with 12 of type 13. For exampleTimestamp. To access the timestamp of an input element, add a keyword parameter default to 14. For exampleTimestamp. To access the timestamp of an input element, add a 15 parameter before the element. For exampleTimestamp. To access the window an input element falls into, add a 16 to the context argument 21 22 23 24Window. To access the window an input element falls into, add a parameter of the type of the window used for the input 73. If the parameter is a window type (a subclass of 18) that does not match the input 73, then an error will be raised. If an element falls in multiple windows (for example, this will happen when using 20), then the 12 method will be invoked multiple time for the element, once for each window. For example, when fixed windows are being used, the window is of type 22Window. To access the window an input element falls into, add a keyword parameter default to 23. If an element falls in multiple windows (for example, this will happen when using 20), then the 30 method will be invoked multiple time for the element, once for each windowWindow. Để truy cập cửa sổ có phần tử đầu vào, hãy thêm tham số 26 trước phần tử. If an element falls in multiple windows (for example, this will happen when using SlidingWindows), then the 36 method will be invoked multiple time for the element, once for each window. Since 26 is an interface it’s possible to type assert to the concrete implementation of the window. For example, when fixed windows are being used, the window is of type 29Window. To access the window an input element falls into, add a 16 to the context argument. If an element falls in multiple windows (for example, this will happen when using 20), then the function will be invoked multiple time for the element, once for each window 25 26 27 28PaneInfo. When triggers are used, Beam provides a 32 object that contains information about the current firing. Using 32 you can determine whether this is an early or a late firing, and how many times this window has already fired for this keyPaneInfo. When triggers are used, Beam provides a 34 object that contains information about the current firing. Using 34 you can determine whether this is an early or a late firing, and how many times this window has already fired for this key. This feature implementation in Python SDK is not fully completed; see more at Issue 17821PaneInfo. When triggers are used, Beam provides 36 object that contains information about the current firing. Using 36 you can determine whether this is an early or a late firing, and how many times this window has already fired for this keyWindow. To access the window an input element falls into, add a 38 to the context argument. Using 36 you can determine whether this is an early or a late firing, and how many times this window has already fired for this key 29 30 31 32Tùy chọn đường ống. The 13 for the current pipeline can always be accessed in a process method by adding it as a parameter 33Các phương thức 41 cũng có thể truy cập nhiều tham số này. Timestamp, Window, key, 13, 24, and 44 parameters can all be accessed in an 41 method. In addition, an 41 method can take a parameter of type 47 which tells whether the timer is based on event time or processing time. Timers are explained in more detail in the Timely (and Stateful) Processing with Apache Beam blog postTimer and State. In addition to aforementioned parameters, user defined Timer and State parameters can be used in a stateful DoFn. Timers and States are explained in more detail in the Timely (and Stateful) Processing with Apache Beam blog post Timer and State. User defined State parameters can be used in a stateful DoFn. Timers aren’t implemented in the Go SDK yet; see more at Issue 22737. Once implemented, user defined Timer parameters can be used in a stateful DoFn. Timers and States are explained in more detail in the Timely (and Stateful) Processing with Apache Beam blog post Timer and State. This feature isn’t yet implemented in the Typescript SDK, but we welcome contributions. In the meantime, Typescript pipelines wishing to use state and timers can do so using 34 35 364. 6. Composite transformsTransforms can have a nested structure, where a complex transform performs multiple simpler transforms (such as more than one 56, 15, 14, or even other composite transforms). These transforms are called composite transforms. Nesting multiple transforms inside a single composite transform can make your code more modular and easier to understandThe Beam SDK comes packed with many useful composite transforms. See the API reference pages for a list of transforms
4. 6. 1. An example composite transformThe 51 transform in the WordCount example program is an example of a composite transform. 51 is a 78 subclass that consists of multiple nested transforms. In its 51 transform applies the following transform operations.
37 38 39 40
4. 6. 2. Tạo một biến đổi tổng hợpA PTransform in the Typescript SDK is simply a function that accepts and returns 48s such as 73sTo create your own composite transform, create a subclass of the 78 class and override the 54 method to specify the actual processing logic. You can then use this transform just as you would a built-in transform from the Beam SDKFor the 78 class type parameters, you pass the 73 types that your transform takes as input, and produces as output. To take multiple 73s as input, or produce multiple 73s as output, use one of the multi-collection types for the relevant type parameterTo create your own composite 78 call the 84 method on the current pipeline scope variable. Transforms passed this new sub- 84 will be a part of the same composite 78To be able to re-use your Composite, build it inside a normal Go function or method. This function is passed a scope and input PCollections, and returns any output PCollections it produces. Note. Such functions cannot be passed directly to 56 functionsThe following code sample shows how to declare a 78 that accepts a 73 of 50s for input, and outputs a 73 of 00s 41 42 39Within your 78 subclass, you’ll need to override the 54 method. The 54 method is where you add the processing logic for the 78. Your override of 54 must accept the appropriate type of input 73 as a parameter, and specify the output 73 as the return valueThe following code sample shows how to override 54 for the 90 class declared in the previous exampleThe following code sample shows how to call the 51 composite PTransform, adding it to your pipeline 44 42 46As long as you override the 54 method in your 78 subclass to accept the appropriate input 73(s) and return the corresponding output 73(s), you can include as many transforms as you want. These transforms can include core transforms, composite transforms, or the transforms included in the Beam SDK librariesYour composite 78s can include as many transforms as you want. These transforms can include core transforms, other composite transforms, or the transforms included in the Beam SDK libraries. They can also consume and return as many 73s as are necessaryYour composite transform’s parameters and return value must match the initial input type and final return type for the entire transform, even if the transform’s intermediate data changes type multiple times Note. The 54 method of a 78 is not meant to be invoked directly by the user of a transform. Instead, you should call the 31 method on the 73 itself, with the transform as an argument. This allows transforms to be nested within the structure of your pipeline4. 6. 3. PTransform Style GuideThe PTransform Style Guide contains additional information not included here, such as style guidelines, logging and testing guidance, and language-specific considerations. The guide is a useful starting point when you want to write new composite PTransforms 5. Pipeline I/OKhi bạn tạo một đường dẫn, bạn thường cần đọc dữ liệu từ một số nguồn bên ngoài, chẳng hạn như tệp hoặc cơ sở dữ liệu. Likewise, you may want your pipeline to output its result data to an external storage system. Beam provides read and write transforms for a number of common data storage types. If you want your pipeline to read from or write to a data storage format that isn’t supported by the built-in transforms, you can implement your own read and write transforms 5. 1. Reading input dataRead transforms read data from an external source and return a 73 representation of the data for use by your pipeline. You can use a read transform at any point while constructing your pipeline to create a new 73, though it will be most common at the start of your pipeline 47 48 495. 2. Writing output dataWrite transforms write the data in a 73 to an external data source. You will most often use write transforms at the end of your pipeline to output your pipeline’s final results. However, you can use a write transform to output a 73's data at any point in your pipeline 50 51 525. 3. File-based input and output data5. 3. 1. Reading from multiple locationsMany read transforms support reading from multiple input files matching a glob operator you provide. Note that glob operators are filesystem-specific and obey filesystem-specific consistency models. The following TextIO example uses a glob operator ( 006) to read all matching input files that have prefix “input-” and the suffix “. csv” in the given location 53 54 55To read data from disparate sources into a single 73, read each one independently and then use the transform to create a single 735. 3. 2. Writing to multiple output filesFor file-based output data, write transforms write to multiple output files by default. When you pass an output file name to a write transform, the file name is used as the prefix for all output files that the write transform produces. You can append a suffix to each output file by specifying a suffix The following write transform example writes multiple output files to a location. Each file has the prefix “numbers”, a numeric tag, and the suffix “. csv” 56 57 585. 4. Beam-provided I/O transformsXem trang Biến đổi I/O do Beam cung cấp để biết danh sách các biến đổi I/O hiện có 6. SchemasOften, the types of the records being processed have an obvious structure. Common Beam sources produce JSON, Avro, Protocol Buffer, or database row objects; all of these types have well defined structures, structures that can often be determined by examining the type. Even within a SDK pipeline, Simple Java POJOs (or equivalent structures in other languages) are often used as intermediate types, and these also have a clear structure that can be inferred by inspecting the class. By understanding the structure of a pipeline’s records, we can provide much more concise APIs for data processing 6. 1. What is a schema?Most structured records share some common characteristics
Often records have a nested structure. A nested structure occurs when a field itself has subfields so the type of the field itself has a schema. Fields that are array or map types is also a common feature of these structured records For example, consider the following schema, representing actions in a fictitious e-commerce company Purchase Field NameField TypeuserIdSTRINGitemIdINT64shippingAddressROW(ShippingAddress)costINT64transactionsARRAY[ROW(Transaction)] Địa chỉ giao hàng Field NameField TypestreetAddressSTRINGcitySTRINGstatenullable STRINGcountrySTRINGpostCodeSTRINGTransaction Field NameField TypebankSTRINGpurchaseAmountDOUBLEPurchase event records are represented by the above purchase schema. Each purchase event contains a shipping address, which is a nested row containing its own schema. Each purchase also contains an array of credit-card transactions (a list, because a purchase might be split across multiple credit cards); each item in the transaction list is a row with its own schema Điều này cung cấp một mô tả trừu tượng về các loại liên quan, một loại được trừu tượng hóa khỏi bất kỳ ngôn ngữ lập trình cụ thể nào Schemas provide us a type-system for Beam records that is independent of any specific programming-language type. There might be multiple Java classes that all have the same schema (for example a Protocol-Buffer class or a POJO class), and Beam will allow us to seamlessly convert between these types. Schemas also provide a simple way to reason about types across different programming-language APIs A 73 with a schema does not need to have a 66 specified, as Beam knows how to encode and decode Schema rows; Beam uses a special coder to encode schema types6. 2. Lược đồ cho các loại ngôn ngữ lập trìnhWhile schemas themselves are language independent, they are designed to embed naturally into the programming languages of the Beam SDK being used. This allows Beam users to continue using native types while reaping the advantage of having Beam understand their element schemas In Java you could use the following set of classes to represent the purchase schema. Beam will automatically infer the correct schema based on the members of the class In Python you can use the following set of classes to represent the purchase schema. Beam will automatically infer the correct schema based on the members of the class In Go, schema encoding is used by default for struct types, with Exported fields becoming part of the schema. Beam will automatically infer the schema based on the fields and field tags of the struct, and their order In Typescript, JSON objects are used to represent schema’d data. Unfortunately type information in Typescript is not propagated to the runtime layer, so it needs to be manually specified in some places (e. g. when using cross-language pipelines) 59 60 61 62Using JavaBean classes as above is one way to map a schema to Java classes. However multiple Java classes might have the same schema, in which case the different Java types can often be used interchangeably. Beam will add implicit conversions between types that have matching schemas. For example, the above 011 class has the same schema as the following class 63So if we had two 73s as follows 64Then these two 73s would have the same schema, even though their Java types would be different. This means for example the following two code snippets are valid 65and 66Even though the in both cases the 23 parameter differs from the 73's Java type, since the schemas are the same Beam will automatically make the conversion. The built-in 016 transform can also be used to translate between Java types of equivalent schemas, as detailed below6. 3. Schema definitionThe schema for a 73 defines elements of that 73 as an ordered list of named fields. Each field has a name, a type, and possibly a set of user options. The type of a field can be primitive or composite. The following are the primitive types currently supported by BeamTypeDescriptionBYTEAn 8-bit signed valueINT16A 16-bit signed valueINT32A 32-bit signed valueINT64A 64-bit signed valueDECIMALAn arbitrary-precision decimal typeFLOATA 32-bit IEEE 754 floating point numberDOUBLEA 64-bit IEEE 754 floating point numberSTRINGA stringDATETIMEA timestamp represented as milliseconds since the epochBOOLEANA boolean valueBYTESA raw byte arrayA field can also reference a nested schema. In this case, the field will have type ROW, and the nested schema will be an attribute of this field type Three collection types are supported as field types. ARRAY, ITERABLE and MAP
6. 4. Logical typesUsers can extend the schema type system to add custom logical types that can be used as a field. A logical type is identified by a unique identifier and an argument. A logical type also specifies an underlying schema type to be used for storage, along with conversions to and from that type. As an example, a logical union can always be represented as a row with nullable fields, where the user ensures that only one of those fields is ever set at a time. However this can be tedious and complex to manage. The OneOf logical type provides a value class that makes it easier to manage the type as a union, while still using a row with nullable fields as its underlying storage. Each logical type also has a unique identifier, so they can be interpreted by other languages as well. More examples of logical types are listed below 6. 4. 1. Defining a logical typeTo define a logical type you must specify a Schema type to be used to represent the underlying type as well as a unique identifier for that type. A logical type imposes additional semantics on top a schema type. For example, a logical type to represent nanosecond timestamps is represented as a schema containing an INT64 and an INT32 field. This schema alone does not say anything about how to interpret this type, however the logical type tells you that this represents a nanosecond timestamp, with the INT64 field representing seconds and the INT32 field representing nanoseconds Logical types are also specified by an argument, which allows creating a class of related types. For example, a limited-precision decimal type would have an integer argument indicating how many digits of precision are represented. The argument is represented by a schema type, so can itself be a complex type In Java, a logical type is specified as a subclass of the 020 class. A custom Java class can be specified to represent the logical type and conversion functions must be supplied to convert back and forth between this Java class and the underlying Schema type representation. For example, the logical type representing nanosecond timestamp might be implemented as followsIn Go, a logical type is specified with a custom implementation of the 021 interface. For example, the logical type provider representing nanosecond timestamps might be implemented as followsIn Typescript, a logical type defined by the LogicalTypeInfo interface which associates a logical type’s URN with its representation and its conversion to and from this representation 67 68 696. 4. 2. Useful logical typesCurrently the Python SDK provides minimal convenience logical types, other than to handle 022Currently the Go SDK provides minimal convenience logical types, other than to handle additional integer primitives, and 023EnumerationTypeThis convenience builder doesn’t yet exist for the Python SDK This convenience builder doesn’t yet exist for the Go SDK This logical type allows creating an enumeration type consisting of a set of named constants 70The value of this field is stored in the row as an INT32 type, however the logical type defines a value type that lets you access the enumeration either as a string or a value. For example 71Given a row object with an enumeration field, you can also extract the field as the enumeration value 72Automatic schema inference from Java POJOs and JavaBeans automatically converts Java enums to EnumerationType logical types OneOfTypeThis convenience builder doesn’t yet exist for the Python SDK This convenience builder doesn’t yet exist for the Go SDK OneOfType allows creating a disjoint union type over a set of schema fields. For example 73The value of this field is stored in the row as another Row type, where all the fields are marked as nullable. The logical type however defines a Value object that contains an enumeration value indicating which field was set and allows getting just that field 74In the above example we used the field names in the switch statement for clarity, however the enum integer values could also be used 6. 5. Creating SchemasIn order to take advantage of schemas, your 73s must have a schema attached to it. Often, the source itself will attach a schema to the PCollection. For example, when using 025 to read Avro files, the source can automatically infer a Beam schema from the Avro schema and attach that to the Beam 73. However not all sources produce schemas. In addition, often Beam pipelines have intermediate stages and types, and those also can benefit from the expressiveness of schemas6. 5. 1. Inferring schemasThích ứng cho
Unfortunately, Beam is unable to access Typescript’s type information at runtime. Schemas must be manually declared with 027. On the other hand, schema-aware operations such as 87 can be used without an explicit schema declaredBeam is able to infer schemas from a variety of common Java types. The 029 annotation can be used to tell Beam to infer schemas from a specific type. Chú thích lấy một đối số là 030 và các lớp 030 đã được tích hợp sẵn cho các loại Java phổ biến. The 032 can also be invoked programmatically for cases where it is not practical to annotate the Java type itselfJava POJOs A POJO (Plain Old Java Object) is a Java object that is not bound by any restriction other than the Java Language Specification. A POJO can contain member variables that are primitives, that are other POJOs, or are collections maps or arrays thereof. POJOs do not have to extend prespecified classes or extend any specific interfaces If a POJO class is annotated with 033, Beam will automatically infer a schema for this class. Nested classes are supported as are classes with 83, array, and 67 fieldsFor example, annotating the following class tells Beam to infer a schema from this POJO class and apply it to any 036 75The 037 annotation tells Beam that this constructor can be used to create instances of TransactionPojo, assuming that constructor parameters have the same names as the field names. 037 can also be used to annotate static factory methods on the class, allowing the constructor to remain private. If there is no 037 annotation then all the fields must be non-final and the class must have a zero-argument constructorThere are a couple of other useful annotations that affect how Beam infers schemas. By default the schema field names inferred will match that of the class field names. However 040 can be used to specify a different name to be used for the schema field. 041 can be used to mark specific class fields as excluded from the inferred schema. For example, it’s common to have ephemeral fields in a class that should not be included in a schema (e. g. caching the hash value to prevent expensive recomputation of the hash), and 041 can be used to exclude these fields. Note that ignored fields will not be included in the encoding of these recordsIn some cases it is not convenient to annotate the POJO class, for example if the POJO is in a different package that is not owned by the Beam pipeline author. In these cases the schema inference can be triggered programmatically in pipeline’s main function as follows 76Java Beans Java Beans are a de-facto standard for creating reusable property classes in Java. While the full standard has many characteristics, the key ones are that all properties are accessed via getter and setter classes, and the name format for these getters and setters is standardized. A Java Bean class can be annotated with 043 and Beam will automatically infer a schema for this class. For example 77The 037 annotation can be used to specify a constructor or a static factory method, in which case the setters and zero-argument constructor can be omitted 78Có thể sử dụng 040 và 041 để thay đổi lược đồ được suy ra, giống như với các lớp POJOAutoValue Java value classes are notoriously difficult to generate correctly. There is a lot of boilerplate you must create in order to properly implement a value class. AutoValue là một thư viện phổ biến để dễ dàng tạo các lớp như vậy bằng cách triển khai một lớp cơ sở trừu tượng đơn giản Beam có thể suy ra lược đồ từ lớp AutoValue. Ví dụ 79This is all that’s needed to generate a simple AutoValue class, and the above 029 annotation tells Beam to infer a schema from it. This also allows AutoValue elements to be used inside of 73s 040 and 041 can be used to alter the schema inferredBeam has a few different mechanisms for inferring schemas from Python code NamedTuple classes A class is a Python class that wraps a 44, assigning a name to each element and restricting it to a particular type. Beam will automatically infer the schema for PCollections with 052 output types. For example 80beam. Row and Select There are also methods for creating ad-hoc schema declarations. First, you can use a lambda that returns instances of 053 81Sometimes it can be more concise to express the same logic with the transform 82Note that these declaration don’t include any specific information about the types of the 055 and 056 fields, so Beam will attempt to infer type information. If it’s unable to it will fall back to the generic type 057. Sometimes this is not ideal, you can use casts to make sure Beam correctly infers types with 053 or with 054 83Beam currently only infers schemas for exported fields in Go structs Structs Beam will automatically infer schemas for all Go structs used as PCollection elements, and default to encoding them using schema encoding 84Unexported fields are ignored, and cannot be automatically inferred as part of the schema. Fields of type func, channel, unsafe. Pointer, or uintptr will be ignored by inference. Fields of interface types are ignored, unless a schema provider is registered for them By default, schema field names will match the exported struct field names. In the above example, “Bank” and “PurchaseAmount” are the schema field names. A schema field name can be overridden with a struct tag for the field 85Overriding schema field names is useful for compatibility cross language transforms, as schema fields may have different requirements or restrictions from Go exported fields 6. 6. Using Schema TransformsA schema on a 73 enables a rich variety of relational transforms. Thực tế là mỗi bản ghi bao gồm các trường được đặt tên cho phép các tập hợp đơn giản và dễ đọc, tham chiếu các trường theo tên, tương tự như các tập hợp trong biểu thức SQLBeam does not yet support Schema transforms natively in Go. However, it will be implemented with the following behavior 6. 6. 1. Field selection syntaxThe advantage of schemas is that they allow referencing of element fields by name. Beam provides a selection syntax for referencing fields, including nested and repeated fields. This syntax is used by all of the schema transforms when referencing the fields they operate on. The syntax can also be used inside of a DoFn to specify which schema fields to process Addressing fields by name still retains type safety as Beam will check that schemas match at the time the pipeline graph is constructed. If a field is specified that does not exist in the schema, the pipeline will fail to launch. In addition, if a field is specified with a type that does not match the type of that field in the schema, the pipeline will fail to launch The following characters are not allowed in field names. . * [ ] { } Top-level fieldsIn order to select a field at the top level of a schema, the name of the field is specified. For example, to select just the user ids from a 73 of purchases one would write (using the 054 transform) 86 87Nested fieldsSupport for Nested fields hasn’t been developed for the Python SDK yet Support for Nested fields hasn’t been developed for the Go SDK yet Individual nested fields can be specified using the dot operator. For example, to select just the postal code from the shipping address one would write 88WildcardsSupport for wildcards hasn’t been developed for the Python SDK yet Support for wildcards hasn’t been developed for the Go SDK yet The * operator can be specified at any nesting level to represent all fields at that level. For example, to select all shipping-address fields one would write 89ArraysAn array field, where the array element type is a row, can also have subfields of the element type addressed. When selected, the result is an array of the selected subfield type. For example Support for Array fields hasn’t been developed for the Python SDK yet Support for Array fields hasn’t been developed for the Go SDK yet 90Will result in a row containing an array field with element-type string, containing the list of banks for each transaction While the use of [] brackets in the selector is recommended, to make it clear that array elements are being selected, they can be omitted for brevity. In the future, array slicing will be supported, allowing selection of portions of the array MapsA map field, where the value type is a row, can also have subfields of the value type addressed. When selected, the result is a map where the keys are the same as in the original map but the value is the specified type. Similar to arrays, the use of {} curly brackets in the selector is recommended, to make it clear that map value elements are being selected, they can be omitted for brevity. In the future, map key selectors will be supported, allowing selection of specific keys from the map. For example, given the following schema PurchasesByType Field NameField TypepurchasesMAP{STRING, ROW{PURCHASE}Sau đây 91Hỗ trợ cho các trường Bản đồ chưa được phát triển cho Python SDK Hỗ trợ cho các trường Bản đồ chưa được phát triển cho Go SDK Sẽ dẫn đến một hàng chứa trường bản đồ với chuỗi loại khóa và chuỗi loại giá trị. Bản đồ đã chọn sẽ chứa tất cả các khóa từ bản đồ gốc và các giá trị sẽ là userId có trong bản ghi mua hàng Mặc dù nên sử dụng dấu ngoặc {} trong bộ chọn, nhưng để làm rõ rằng các phần tử giá trị bản đồ đang được chọn, chúng có thể được bỏ qua cho ngắn gọn. Trong tương lai, tính năng cắt bản đồ sẽ được hỗ trợ, cho phép chọn các phím cụ thể từ bản đồ 6. 6. 2. Biến đổi lược đồBeam cung cấp một tập hợp các biến đổi hoạt động tự nhiên trên các lược đồ. Các biến đổi này rất biểu cảm, cho phép lựa chọn và tổng hợp theo các trường lược đồ được đặt tên. Sau đây là một số ví dụ về biến đổi lược đồ hữu ích Chọn đầu vàoThông thường, một phép tính chỉ quan tâm đến một tập hợp con các trường trong đầu vào 73. Biến đổi 054 cho phép một người dễ dàng chỉ chiếu ra các trường quan tâm. Kết quả 73 có một lược đồ chứa từng trường được chọn dưới dạng trường cấp cao nhất. Có thể chọn cả trường cấp cao nhất và trường lồng nhau. Ví dụ: trong lược đồ Mua hàng, người ta chỉ có thể chọn các trường userId và streetAddress như sau 92Support for Nested fields hasn’t been developed for the Python SDK yet Support for Nested fields hasn’t been developed for the Go SDK yet Kết quả 73 sẽ có lược đồ sauField NameField TypeuserIdSTRINGstreetAddressSTRINGĐiều này cũng đúng với các lựa chọn ký tự đại diện. Sau đây 93Support for Wildcards hasn’t been developed for the Python SDK yet Support for Wildcards hasn’t been developed for the Go SDK yet Will result in the following schema Field NameField TypeuserIdSTRINGstreetAddressSTRINGcitySTRINGstatenullable STRINGcountrySTRINGpostCodeSTRINGWhen selecting fields nested inside of an array, the same rule applies that each selected field appears separately as a top-level field in the resulting row. This means that if multiple fields are selected from the same nested row, each selected field will appear as its own array field. For example 94Support for nested fields hasn’t been developed for the Python SDK yet Support for nested fields hasn’t been developed for the Go SDK yet Will result in the following schema Field NameField TypebankARRAY[STRING]purchaseAmountARRAY[DOUBLE]Wildcard selections are equivalent to separately selecting each field Selecting fields nested inside of maps have the same semantics as arrays. If you select multiple fields from a map , then each selected field will be expanded to its own map at the top level. This means that the set of map keys will be copied, once for each selected field Sometimes different nested rows will have fields with the same name. Selecting multiple of these fields would result in a name conflict, as all selected fields are put in the same row schema. When this situation arises, the 067 builder method can be used to provide an alternate name for the selected fieldAnother use of the Select transform is to flatten a nested schema into a single flat schema. For example 95Support for nested fields hasn’t been developed for the Python SDK yet Support for nested fields hasn’t been developed for the Go SDK yet Will result in the following schema Field NameField TypeuserIdSTRINGitemIdSTRINGshippingAddress_streetAddressSTRINGshippingAddress_citynullable STRINGshippingAddress_stateSTRINGshippingAddress_countrySTRINGshippingAddress_postCodeSTRINGcostCentsINT64transactions_bankARRAY[STRING]transactions_purchaseAmountARRAY[DOUBLE] Grouping aggregationsThe 068 transform allows simply grouping data by any number of fields in the input schema, applying aggregations to those groupings, and storing the result of those aggregations in a new schema field. The output of the 068 transform has a schema with one field corresponding to each aggregation performedThe 87 transform allows simply grouping data by any number of fields in the input schema, applying aggregations to those groupings, and storing the result of those aggregations in a new schema field. The output of the 87 transform has a schema with one field corresponding to each aggregation performedThe simplest usage of 068 specifies no aggregations, in which case all inputs matching the provided set of fields are grouped together into an 073 field. For exampleThe simplest usage of 87 specifies no aggregations, in which case all inputs matching the provided set of fields are grouped together into an 073 field. For example 96 97Support for schema-aware grouping hasn’t been developed for the Go SDK yet The output schema of this is Field NameField TypekeyROW{userId. STRING, bank. STRING}valuesITERABLE[ROW[Purchase]]The key field contains the grouping key and the values field contains a list of all the values that matched that key The names of the key and values fields in the output schema can be controlled using this withKeyField and withValueField builders, as follows 98It is quite common to apply one or more aggregations to the grouped result. Each aggregation can specify one or more fields to aggregate, an aggregation function, and the name of the resulting field in the output schema. For example, the following application computes three aggregations grouped by userId, with all aggregations represented in a single output schema 99 00Support for schema-aware grouping hasn’t been developed for the Go SDK yet The result of this aggregation will have the following schema Field NameField TypekeyROW{userId. STRING}valueROW{numPurchases. INT64, totalSpendCents. INT64, topPurchases. ARRAY[INT64]}Often 076 will be use to flatten the result into a non-nested, flat schemaJoinsBeam supports equijoins on schema 37 - namely joins where the join condition depends on the equality of a subset of fields. For example, the following examples uses the Purchases schema to join transactions with the reviews that are likely associated with that transaction (both the user and product match that in the transaction). This is a “natural join” - one in which the same field names are used on both the left-hand and right-hand sides of the join - and is specified with the 078 keywordSupport for joins hasn’t been developed for the Python SDK yet Support for joins hasn’t been developed for the Go SDK yet 01The resulting schema is the following Field NameField TypelhsROW{Transaction}rhsROW{Review}Each resulting row contains one Transaction and one Review that matched the join condition If the fields to match in the two schemas have different names, then the on function can be used. For example, if the Review schema named those fields differently than the Transaction schema, then we could write the following Support for joins hasn’t been developed for the Python SDK yet Support for joins hasn’t been developed for the Go SDK yet 02In addition to inner joins, the Join transform supports full outer joins, left outer joins, and right outer joins Complex joinsWhile most joins tend to be binary joins - joining two inputs together - sometimes you have more than two input streams that all need to be joined on a common key. The 079 transform allows joining multiple 37 together based on equality of schema fields. Each 73 can be marked as required or optional in the final join record, providing a generalization of outer joins to joins with greater than two input 73s. Đầu ra có thể được mở rộng tùy chọn - cung cấp các bản ghi được nối riêng lẻ, như trong biến đổi 083. The output can also be processed in unexpanded format - providing the join key along with Iterables of all records from each input that matched that keySupport for joins hasn’t been developed for the Python SDK yet Support for joins hasn’t been developed for the Go SDK yet Filtering eventsThe 71 transform can be configured with a set of predicates, each one based one specified fields. Only records for which all predicates return true will pass the filter. For example the following 03Will produce all purchases made from Germany with a purchase price of greater than twenty cents Adding fields to a schemaThe AddFields transform can be used to extend a schema with new fields. Các hàng đầu vào sẽ được mở rộng sang lược đồ mới bằng cách chèn các giá trị null cho các trường mới, mặc dù có thể chỉ định các giá trị mặc định thay thế; . Có thể thêm các trường con lồng nhau bằng cách sử dụng cú pháp chọn trường, bao gồm các trường lồng nhau bên trong các mảng hoặc giá trị bản đồ For example, the following application 04Results in a 73 with an expanded schema. All of the rows and fields of the input, but also with the specified fields added to the schema. All resulting rows will have null values filled in for the timeOfDaySeconds and the shippingAddress. deliveryNotes fields, and a false value filled in for the transactions. isFlagged fieldRemoving fields from a schema 086 allows specific fields to be dropped from a schema. Input rows will have their schemas truncated, and any values for dropped fields will be removed from the output. Nested fields can also be dropped using the field selection syntaxFor example, the following snippet 05Results in a copy of the input with those two fields and their corresponding values removed Renaming schema fields 087 allows specific fields in a schema to be renamed. The field values in input rows are left unchanged, only the schema is modified. This transform is often used to prepare records for output to a schema-aware sink, such as an RDBMS, to make sure that the 73 schema field names match that of the output. It can also be used to rename fields generated by other transforms to make them more usable (similar to SELECT AS in SQL). Nested fields can also be renamed using the field-selection syntaxFor example, the following snippet 06Results in the same set of unmodified input elements, however the schema on the PCollection has been changed to rename userId to userIdentifier and shippingAddress. streetAddress to shippingAddress. street Converting between typesAs mentioned, Beam can automatically convert between different Java types, as long as those types have equivalent schemas. One way to do this is by using the 016 transform, as follows 07Beam will validate that the inferred schema for 090 matches that of the input 73, and will then cast to a 092Since the 093 class can support any schema, any 73 with schema can be cast to a 73 of rows, as follows 08If the source type is a single-field schema, Convert will also convert to the type of the field if asked, effectively unboxing the row. For example, give a schema with a single INT64 field, the following will convert it to a 096 09In all cases, type checking is done at pipeline graph construction, and if the types do not match the schema then the pipeline will fail to launch 6. 6. 3. Lược đồ trong ParDoA 73 with a schema can apply a 56, just like any other 73. However the Beam runner is aware of schemas when applying a 56, which enables additional functionalityInput conversionBeam does not yet support input conversion in Go Since Beam knows the schema of the source 73, it can automatically convert the elements to any Java type for which a matching schema is known. For example, using the above-mentioned Transaction schema, say we have the following 73 10If there were no schema, then the applied 83 would have to accept an element of type 104. However since there is a schema, you could apply the following DoFn 11Even though the 23 parameter does not match the Java type of the 73, since it has a matching schema Beam will automatically convert elements. If the schema does not match, Beam will detect this at graph-construction time and will fail the job with a type errorSince every schema can be represented by a Row type, Row can also be used here 12Input selectionSince the input has a schema, you can also automatically select specific fields to process in the DoFn Given the above purchases 73, say you want to process just the userId and the itemId fields. You can do these using the above-described selection expressions, as follows 13You can also select nested fields, as follows 14For more information, see the section on field-selection expressions. When selecting subschemas, Beam will automatically convert to any matching schema type, just like when reading the entire row 7. Data encoding and type safetyThích ứng cho
When Beam runners execute your pipeline, they often need to materialize the intermediate data in your 73s, which requires converting elements to and from byte strings. The Beam SDKs use objects called 66s to describe how the elements of a given 73 may be encoded and decoded
In the Beam SDK for Java, the type 66 provides the methods required for encoding and decoding data. The SDK for Java provides a number of Coder subclasses that work with a variety of standard Java types, such as Integer, Long, Double, StringUtf8 and more. You can find all of the available Coder subclasses in the Coder packageIn the Beam SDK for Python, the type 66 provides the methods required for encoding and decoding data. The SDK for Python provides a number of Coder subclasses that work with a variety of standard Python types, such as primitive types, Tuple, Iterable, StringUtf8 and more. You can find all of the available Coder subclasses in the apache_beam. coders packageStandard Go types like 01, 116 117, 118, and 96 and more are coded using builtin coders. Structs and pointers to structs default using Beam Schema Row encoding. However, users can build and register custom coders with 120. You can find available Coder functions in the coder packageStandard Typescript types like 121, 122 and 96 and more are coded using builtin coders. Json objects and arrays are encoded via a BSON encoding. For these types, coders need not be specified unless interacting with cross-language transforms. Users can build custom coders by extending 124 for use with 125, but generally logical types are preferred for this case
7. 1. Specifying codersThe Beam SDKs require a coder for every 73 in your pipeline. In most cases, the Beam SDK is able to automatically infer a 66 for a 73 based on its element type or the transform that produces it, however, in some cases the pipeline author will need to specify a 66 explicitly, or develop a 66 for their custom typeYou can explicitly set the coder for an existing 73 by using the method 132. Note that you cannot call 133 on a 73 that has been finalized (e. g. by calling 135 on it)You can get the coder for an existing 73 by using the method 137. This method will fail with an 26 if a coder has not been set and cannot be inferred for the given 73Beam SDKs use a variety of mechanisms when attempting to automatically infer the 66 for a 73Each pipeline object has a 142. The 142 represents a mapping of Java types to the default coders that the pipeline should use for 73s of each typeThe Beam SDK for Python has a 142 that represents a mapping of Python types to the default coder that should be used for 73s of each typeThe Beam SDK for Go allows users to register default coder implementations with 120By default, the Beam SDK for Java automatically infers the 66 for the elements of a 73 produced by a 78 using the type parameter from the transform’s function object, such as 83. In the case of 56, for example, a 153 function object accepts an input element of type 00 and produces an output element of type 50. In such a case, the SDK for Java will automatically infer the default 66 for the output 157 (in the default pipeline 142, this is 159)By default, the Beam SDK for Python automatically infers the 66 for the elements of an output 73 using the typehints from the transform’s function object, such as 83. In the case of 56, for example a 83 with the typehints 165 and 166 accepts an input element of type int and produces an output element of type str. In such a case, the Beam SDK for Python will automatically infer the default 66 for the output 73 (in the default pipeline 142, this is 170)By default, the Beam SDK for Go automatically infers the 66 for the elements of an output 73 by the output of the transform’s function object, such as a 83. In the case of 56, for example a 83 with the parameters of 176 accepts an input element of type 01 and produces an output element of type 96. In such a case, the Beam SDK for Go will automatically infer the default 66 for the output 73 to be the 181 coder
Khi sử dụng _______9_______94, cách đơn giản nhất để đảm bảo rằng bạn có bộ mã hóa chính xác là gọi ___54_______186 khi bạn áp dụng phép biến đổi _________94 7. 2. Default coders and the CoderRegistryEach Pipeline object has a 142 object, which maps language types to the default coder the pipeline should use for those types. You can use the 142 yourself to look up the default coder for a given type, or to register a new default coder for a given type 142 contains a default mapping of coders to standard JavaPython types for any pipeline you create using the Beam SDK for JavaPython . The following table shows the standard mapping. Java TypeDefault CoderDoubleDoubleCoderInstantInstantCoderIntegerVarIntCoderIterableIterableCoderKVKvCoderListListCoderMapMapCoderLongVarLongCoderStringStringUtf8CoderTableRowTableRowJsonCoderVoidVoidCoderbyte[ ]ByteArrayCoderTimestampedValueTimestampedValueCoder Python TypeDefault CoderintVarIntCoderfloatFloatCoderstrBytesCoderbytesStrUtf8CoderTupleTupleCoder7. 2. 1. Looking up a default coderYou can use the method 191 to determine the default Coder for a Java type. You can access the 142 for a given pipeline by using the method 193. This allows you to determine (or set) the default Coder for a Java type on a per-pipeline basis. i. e. “for this pipeline, verify that Integer values are encoded using 194. ”You can use the method 195 to determine the default Coder for a Python type. You can use 196 to access the 142. This allows you to determine (or set) the default Coder for a Python typeYou can use the 198 function to determine the default Coder for a Go type7. 2. 2. Setting the default coder for a typeTo set the default Coder for a JavaPython type for a particular pipeline, you obtain and modify the pipeline’s 142. You use the method 193 196 to get the 142 object, and then use the method 203 204 to register a new 66 for the target type. To set the default Coder for a Go type you use the function 120 to register a encoder and decoder functions for the target type. However, built in types like 01, 96, 117, etc cannot have their coders overrideThe following example code demonstrates how to set a default Coder, in this case 194, for Integerint values for a pipeline. The following example code demonstrates how to set a custom Coder for 211 elements 15 16 177. 2. 3. Chú thích một loại dữ liệu tùy chỉnh với một bộ mã hóa mặc địnhNếu chương trình đường ống của bạn xác định loại dữ liệu tùy chỉnh, bạn có thể sử dụng chú thích 212 để chỉ định bộ mã hóa sẽ sử dụng với loại đó. Theo mặc định, Beam sẽ sử dụng 213 sử dụng tuần tự hóa Java, nhưng nó có nhược điểm
Bạn có thể sử dụng chú thích 212 để đặt mặc định mới như sau 18Nếu bạn đã tạo một bộ mã hóa tùy chỉnh để phù hợp với loại dữ liệu của mình và bạn muốn sử dụng chú thích 212, thì lớp bộ mã hóa của bạn phải triển khai một phương thức xuất xưởng tĩnh 216 19The Beam SDK for PythonGo does not support annotating data types with a default coder. Nếu bạn muốn đặt bộ mã hóa mặc định, hãy sử dụng phương pháp được mô tả trong phần trước, Đặt bộ mã hóa mặc định cho loại. 8. WindowingWindowing subdivides a 73 according to the timestamps of its individual elements. Transforms that aggregate multiple elements, such as 14 and 15, work implicitly on a per-window basis — they process each 73 as a succession of multiple, finite windows, though the entire collection itself may be of unbounded sizeA related concept, called triggers, determines when to emit the results of aggregation as unbounded data arrives. You can use triggers to refine the windowing strategy for your 73. Triggers allow you to deal with late-arriving data or to provide early results. See the section for more information8. 1. Windowing basicsSome Beam transforms, such as 14 and 15, group multiple elements by a common key. Ordinarily, that grouping operation groups all of the elements that have the same key within the entire data set. With an unbounded data set, it is impossible to collect all of the elements, since new elements are constantly being added and may be infinitely many (e. g. streaming data). If you are working with unbounded 73s, windowing is especially usefulIn the Beam model, any 73 (including unbounded 73s) can be subdivided into logical windows. Each element in a 73 is assigned to one or more windows according to the 73's windowing function, and each individual window contains a finite number of elements. Grouping transforms then consider each 73's elements on a per-window basis. 14, for example, implicitly groups the elements of a 73 by key and windowCaution. Beam’s default windowing behavior is to assign all elements of a 73 to a single, global window and discard late data, even for unbounded 73s. Before you use a grouping transform such as 14 on an unbounded 73, you must do at least one of the following
If you don’t set a non-global windowing function or a non-default trigger for your unbounded 73 and subsequently use a grouping transform such as 14 or 15, your pipeline will generate an error upon construction and your job will fail8. 1. 1. Windowing constraintsAfter you set the windowing function for a 73, the elements’ windows are used the next time you apply a grouping transform to that 73. Window grouping occurs on an as-needed basis. Nếu bạn đặt chức năng tạo cửa sổ bằng cách sử dụng biến đổi 241, thì mỗi phần tử được gán cho một cửa sổ, nhưng các cửa sổ đó không được xem xét cho đến khi 14 hoặc 15 tổng hợp trên một cửa sổ và phím. This can have different effects on your pipeline. Consider the example pipeline in the figure belowFigure 3. Pipeline applying windowing In the above pipeline, we create an unbounded 73 by reading a set of key/value pairs using 245, and then apply a windowing function to that collection using the 241 transform. We then apply a 56 to the collection, and then later group the result of that 56 using 14. The windowing function has no effect on the 56 transform, because the windows are not actually used until they’re needed for the 14. Subsequent transforms, however, are applied to the result of the 14 – data is grouped by both key and window8. 1. 2. Windowing with bounded PCollectionsYou can use windowing with fixed-size data sets in bounded 73s. However, note that windowing considers only the implicit timestamps attached to each element of a 73, and data sources that create fixed data sets (such as 255) assign the same timestamp to every element. This means that all the elements are by default part of a single, global windowTo use windowing with fixed data sets, you can assign your own timestamps to each element. To assign timestamps to elements, use a 56 transform with a 83 that outputs each element with a new timestamp (for example, the WithTimestamps transform in the Beam SDK for Java)To illustrate how windowing with a bounded 73 can affect how your pipeline processes data, consider the following pipelineFigure 4. 14 and 56 without windowing, on a bounded collectionIn the above pipeline, we create a bounded 73 by reading lines from a file using 255. We then group the collection using 14, and apply a 56 transform to the grouped 73. In this example, the 14 creates a collection of unique keys, and then 56 gets applied exactly once per keyNote that even if you don’t set a windowing function, there is still a window – all elements in your 73 are assigned to a single global windowNow, consider the same pipeline, but using a windowing function Figure 5. 14 and 56 with windowing, on a bounded collectionAs before, the pipeline creates a bounded 73 by reading lines from a file. We then set a for that 73. The 14 transform groups the elements of the 73 by both key and window, based on the windowing function. The subsequent 56 transform gets applied multiple times per key, once for each window8. 2. Provided windowing functionsYou can define different kinds of windows to divide the elements of your 73. Beam provides several windowing functions, including
You can also define your own 42 if you have a more complex needNote that each element can logically belong to more than one window, depending on the windowing function you use. Sliding time windowing, for example, can create overlapping windows wherein a single element can be assigned to multiple windows. However, each element in a 73 can only be in one window, so if an element is assigned to multiple windows, the element is conceptually duplicated into each of the windows and each element is identical except for its window8. 2. 1. Fixed time windowsThe simplest form of windowing is using fixed time windows. given a timestamped 73 which might be continuously updating, each window might capture (for example) all elements with timestamps that fall into a 30 second intervalA fixed time window represents a consistent duration, non overlapping time interval in the data stream. Consider windows with a 30 second duration. all of the elements in your unbounded 73 with timestamp values from 0. 00. 00 up to (but not including) 0. 00. 30 belong to the first window, elements with timestamp values from 0. 00. 30 up to (but not including) 0. 01. 00 belong to the second window, and so onFigure 6. Fixed time windows, 30s in duration 8. 2. 2. Sliding time windowsA sliding time window also represents time intervals in the data stream; however, sliding time windows can overlap. For example, each window might capture 60 seconds worth of data, but a new window starts every 30 seconds. The frequency with which sliding windows begin is called the period. Therefore, our example would have a window duration of 60 seconds and a period of 30 seconds Because multiple windows overlap, most elements in a data set will belong to more than one window. This kind of windowing is useful for taking running averages of data; using sliding time windows, you can compute a running average of the past 60 seconds’ worth of data, updated every 30 seconds, in our example Figure 7. Sliding time windows, with 1 minute window duration and 30s window period 8. 2. 3. Session windowsA session window function defines windows that contain elements that are within a certain gap duration of another element. Session windowing applies on a per-key basis and is useful for data that is irregularly distributed with respect to time. For example, a data stream representing user mouse activity may have long periods of idle time interspersed with high concentrations of clicks. If data arrives after the minimum specified gap duration time, this initiates the start of a new window Figure 8. Session windows, with a minimum gap duration. Lưu ý cách mỗi khóa dữ liệu có các cửa sổ khác nhau, theo phân phối dữ liệu của nó 8. 2. 4. The single global windowBy default, all data in a 73 is assigned to the single global window, and late data is discarded. If your data set is of a fixed size, you can use the global window default for your 73You can use the single global window if you are working with an unbounded data set (e. g. from a streaming data source) but use caution when applying aggregating transforms such as 14 and 15. The single global window with a default trigger generally requires the entire data set to be available before processing, which is not possible with continuously updating data. To perform aggregations on an unbounded 73 that uses global windowing, you should specify a non-default trigger for that 738. 3. Setting your PCollection’s windowing functionYou can set the windowing function for a 73 by applying the 241 transform. When you apply the 241 transform, you must provide a 42. The 42 determines the windowing function your 73 will use for subsequent grouping transforms, such as a fixed or sliding time windowWhen you set a windowing function, you may also want to set a trigger for your 73. The trigger determines when each individual window is aggregated and emitted, and helps refine how the windowing function performs with respect to late data and computing early results. See the section for more information8. 3. 1. Fixed-time windowsThe following example code shows how to apply 241 to divide a 73 into fixed windows, each 60 seconds in length 20 21 22 238. 3. 2. Sliding time windowsThe following example code shows how to apply 241 to divide a 73 into sliding time windows. Each window is 30 seconds in length, and a new window begins every five seconds 24 25 26 278. 3. 3. Session windowsThe following example code shows how to apply 241 to divide a 73 into session windows, where each session must be separated by a time gap of at least 10 minutes (600 seconds) 28 29 30 31Lưu ý rằng các phiên là theo từng khóa — mỗi khóa trong bộ sưu tập sẽ có các nhóm phiên riêng tùy thuộc vào phân phối dữ liệu 8. 3. 4. Cửa sổ toàn cầu duy nhấtNếu 73 của bạn bị giới hạn (kích thước cố định), bạn có thể gán tất cả các phần tử cho một cửa sổ chung duy nhất. Mã ví dụ sau đây cho biết cách đặt một cửa sổ chung cho một 73 32 33 34 358. 4. Hình mờ và dữ liệu muộnTrong bất kỳ hệ thống xử lý dữ liệu nào, có một độ trễ nhất định giữa thời điểm một sự kiện dữ liệu xảy ra ("thời gian sự kiện", được xác định bởi dấu thời gian trên chính phần tử dữ liệu) và thời gian phần tử dữ liệu thực tế được xử lý ở bất kỳ giai đoạn nào trong . Ngoài ra, không có gì đảm bảo rằng các sự kiện dữ liệu sẽ xuất hiện trong quy trình bán hàng của bạn theo đúng thứ tự mà chúng được tạo Ví dụ: giả sử chúng ta có một 73 đang sử dụng cửa sổ có thời gian cố định, với các cửa sổ dài năm phút. For each window, Beam must collect all the data with an event time timestamp in the given window range (between 0. 00 và 4. 59 trong cửa sổ đầu tiên chẳng hạn). Dữ liệu có dấu thời gian nằm ngoài phạm vi đó (dữ liệu từ 5. 00 trở lên) thuộc về một cửa sổ khácTuy nhiên, dữ liệu không phải lúc nào cũng được đảm bảo đến theo một đường ống theo thứ tự thời gian hoặc luôn đến theo các khoảng thời gian có thể dự đoán được. Beam theo dõi một hình mờ, đó là khái niệm của hệ thống về thời điểm tất cả dữ liệu trong một cửa sổ nhất định có thể được đưa vào đường ống. Sau khi hình mờ đi qua cuối cửa sổ, bất kỳ phần tử nào khác xuất hiện cùng với dấu thời gian trong cửa sổ đó đều được coi là dữ liệu muộn Từ ví dụ của chúng tôi, giả sử chúng tôi có một hình mờ đơn giản giả định thời gian trễ khoảng 30 giây giữa dấu thời gian dữ liệu (thời gian sự kiện) và thời gian dữ liệu xuất hiện trong đường ống (thời gian xử lý), sau đó Beam sẽ đóng cửa sổ đầu tiên ở mức 5. 30. Nếu một bản ghi dữ liệu đến lúc 5. 34, nhưng với dấu thời gian sẽ đưa nó vào 0. 00-4. 59 cửa sổ (giả sử, 3. 38), thì bản ghi đó là dữ liệu trễ Ghi chú. Để đơn giản, chúng tôi giả định rằng chúng tôi đang sử dụng một hình mờ rất đơn giản để ước tính thời gian trễ. Trên thực tế, nguồn dữ liệu của 73 của bạn xác định hình mờ và hình mờ có thể chính xác hoặc phức tạp hơnCấu hình cửa sổ mặc định của Beam cố gắng xác định thời điểm tất cả dữ liệu đã đến (dựa trên loại nguồn dữ liệu) và sau đó chuyển hình mờ qua cuối cửa sổ. Cấu hình mặc định này không cho phép dữ liệu trễ. cho phép bạn sửa đổi và tinh chỉnh chiến lược cửa sổ cho một 73. Bạn có thể sử dụng trình kích hoạt để quyết định thời điểm mỗi cửa sổ riêng lẻ tổng hợp và báo cáo kết quả của nó, bao gồm cả cách cửa sổ phát ra các phần tử trễ8. 4. 1. Quản lý dữ liệu muộnBạn có thể cho phép dữ liệu trễ bằng cách gọi hoạt động 305 khi bạn đặt chiến lược cửa sổ của 73. Ví dụ mã sau minh họa chiến lược tạo cửa sổ sẽ cho phép dữ liệu trễ tối đa hai ngày sau khi kết thúc cửa sổ 36 37 38Khi bạn đặt 305 trên 73, độ trễ cho phép đó sẽ lan truyền tới bất kỳ 73 nào tiếp theo bắt nguồn từ 73 đầu tiên mà bạn đã áp dụng độ trễ được phép cho. Nếu bạn muốn thay đổi độ trễ được phép sau này trong quy trình bán hàng của mình, bạn phải làm như vậy một cách rõ ràng bằng cách áp dụng 3118. 5. Thêm dấu thời gian vào các phần tử của PCollectionNguồn không giới hạn cung cấp dấu thời gian cho từng phần tử. Tùy thuộc vào nguồn không giới hạn của bạn, bạn có thể cần định cấu hình cách trích xuất dấu thời gian từ luồng dữ liệu thô Tuy nhiên, các nguồn có giới hạn (chẳng hạn như tệp từ 255) không cung cấp dấu thời gian. Nếu bạn cần dấu thời gian, bạn phải thêm chúng vào các phần tử của 73Bạn có thể gán dấu thời gian mới cho các phần tử của 73 bằng cách áp dụng một biến đổi tạo ra các phần tử mới có dấu thời gian mà bạn đã đặtMột ví dụ có thể là nếu quy trình của bạn đọc các bản ghi nhật ký từ một tệp đầu vào và mỗi bản ghi nhật ký bao gồm một trường dấu thời gian; . Bạn có thể phân tích cú pháp trường dấu thời gian từ mỗi bản ghi và sử dụng biến đổi 56 với 83 để đính kèm dấu thời gian vào từng thành phần trong 73 của bạn 39 40 419. Gây nên
Khi thu thập và nhóm dữ liệu vào các cửa sổ, Beam sử dụng trình kích hoạt để xác định thời điểm phát ra kết quả tổng hợp của từng cửa sổ (được gọi là ngăn). Nếu bạn sử dụng cấu hình cửa sổ mặc định của Beam và , Beam sẽ xuất kết quả tổng hợp khi nó và loại bỏ tất cả dữ liệu tiếp theo cho cửa sổ đó Bạn có thể đặt kích hoạt cho 73 của mình để thay đổi hành vi mặc định này. Beam cung cấp một số trình kích hoạt dựng sẵn mà bạn có thể đặt
Ở cấp độ cao, trình kích hoạt cung cấp hai khả năng bổ sung so với việc chỉ xuất ra ở cuối cửa sổ
Những khả năng này cho phép bạn kiểm soát luồng dữ liệu của mình và cân bằng giữa các yếu tố khác nhau tùy thuộc vào trường hợp sử dụng của bạn
Ví dụ: một hệ thống yêu cầu các bản cập nhật nhạy cảm với thời gian có thể sử dụng trình kích hoạt dựa trên thời gian nghiêm ngặt phát ra một cửa sổ sau mỗi N giây, đánh giá tính kịp thời hơn tính đầy đủ của dữ liệu. Một hệ thống coi trọng tính đầy đủ của dữ liệu hơn thời gian chính xác của kết quả có thể chọn sử dụng trình kích hoạt mặc định của Beam, kích hoạt ở cuối cửa sổ Bạn cũng có thể đặt trình kích hoạt cho 73 không giới hạn sử dụng. Điều này có thể hữu ích khi bạn muốn quy trình của mình cung cấp các bản cập nhật định kỳ trên tập dữ liệu không giới hạn — ví dụ: mức trung bình đang chạy của tất cả dữ liệu được cung cấp cho thời điểm hiện tại, được cập nhật N giây hoặc mỗi N phần tử9. 1. Trình kích hoạt thời gian sự kiệnTrình kích hoạt 320 hoạt động vào thời gian diễn ra sự kiện. Trình kích hoạt 320 phát ra nội dung của cửa sổ sau khi vượt qua phần cuối của cửa sổ, dựa trên dấu thời gian được đính kèm với các phần tử dữ liệu. Hình mờ là thước đo tiến độ toàn cầu và là khái niệm của Beam về tính đầy đủ của đầu vào trong quy trình của bạn tại bất kỳ điểm nào. ______54_______322 ______54_______320 324 chỉ kích hoạt khi hình mờ đi qua cuối cửa sổ. Ngoài ra, bạn có thể định cấu hình trình kích hoạt kích hoạt nếu quy trình của bạn nhận dữ liệu trước hoặc sau khi kết thúc cửa sổ Ví dụ sau đây cho thấy một kịch bản thanh toán và sử dụng cả kích hoạt sớm và muộn 42 43 449. 1. 1. Trình kích hoạt mặc địnhTrình kích hoạt mặc định cho 73 dựa trên thời gian sự kiện và phát ra kết quả của cửa sổ khi hình mờ của Beam đi qua cuối cửa sổ, sau đó kích hoạt mỗi khi dữ liệu trễ đếnTuy nhiên, nếu bạn đang sử dụng cả cấu hình cửa sổ mặc định và trình kích hoạt mặc định, trình kích hoạt mặc định sẽ phát chính xác một lần và dữ liệu muộn sẽ bị loại bỏ. Điều này là do cấu hình cửa sổ mặc định có giá trị độ trễ cho phép là 0. Xem phần Xử lý dữ liệu muộn để biết thông tin về cách sửa đổi hành vi này 9. 2. Kích hoạt thời gian xử lýTrình kích hoạt 326 hoạt động theo thời gian xử lý. Ví dụ: trình kích hoạt ______54_______327 ______54_______326 329 phát ra một cửa sổ sau một khoảng thời gian nhất định . Thời gian xử lý được xác định bởi đồng hồ hệ thống, thay vì dấu thời gian của phần tử dữ liệu. Trình kích hoạt 326 hữu ích để kích hoạt kết quả sớm từ một cửa sổ, đặc biệt là cửa sổ có khung thời gian lớn, chẳng hạn như một cửa sổ toàn cầu9. 3. Data-driven triggersBeam cung cấp một trình kích hoạt theo hướng dữ liệu, Ví dụ: điều quan trọng cần lưu ý là nếu bạn chỉ định ______54_______334 CounterCount(50) . Nếu 32 yếu tố quan trọng với bạn, hãy cân nhắc sử dụng để kết hợp nhiều điều kiện. Điều này cho phép bạn chỉ định nhiều điều kiện bắn, chẳng hạn như “bắn khi tôi nhận được 50 phần tử hoặc cứ sau 1 giây”. and only 32 elements arrive, those 32 elements sit around forever. If the 32 elements are important to you, consider using to combine multiple conditions. This allows you to specify multiple firing conditions such as “fire either when I receive 50 elements, or every 1 second”. 9. 4. Đặt trình kích hoạtKhi bạn đặt chức năng tạo cửa sổ cho 73 bằng cách sử dụng 241 338 339 transform, you can also specify a trigger.Bạn đặt (các) trình kích hoạt cho một 73 bằng cách gọi phương thức 341 trên kết quả biến đổi 342 của bạn. Mẫu mã này đặt trình kích hoạt dựa trên thời gian cho 73, trình kích hoạt này đưa ra kết quả một phút sau khi phần tử đầu tiên trong cửa sổ đó được xử lý. Dòng cuối cùng trong mẫu mã, 344, đặt chế độ tích lũy của cửa sổBạn đặt (các) trình kích hoạt cho 73 bằng cách đặt tham số 346 khi bạn sử dụng biến đổi 338. Mẫu mã này đặt trình kích hoạt dựa trên thời gian cho 73, trình kích hoạt này đưa ra kết quả một phút sau khi phần tử đầu tiên trong cửa sổ đó được xử lý. Tham số 349 đặt chế độ tích lũy của cửa sổBạn đặt (các) trình kích hoạt cho một 73 bằng cách chuyển tham số 351 khi bạn sử dụng biến đổi 339. Mẫu mã này đặt trình kích hoạt dựa trên thời gian cho 73, trình kích hoạt này đưa ra kết quả một phút sau khi phần tử đầu tiên trong cửa sổ đó được xử lý. Tham số 354 đặt chế độ tích lũy của cửa sổ 45 46 479. 4. 1. Chế độ tích lũy cửa sổKhi bạn chỉ định trình kích hoạt, bạn cũng phải đặt chế độ tích lũy của cửa sổ. Khi trình kích hoạt kích hoạt, nó sẽ phát ra nội dung hiện tại của cửa sổ dưới dạng một ngăn. Vì trình kích hoạt có thể kích hoạt nhiều lần, nên chế độ tích lũy sẽ xác định xem hệ thống sẽ tích lũy các ô cửa sổ khi trình kích hoạt kích hoạt hay loại bỏ chúng Để đặt một cửa sổ tích lũy các ô được tạo khi trình kích hoạt kích hoạt, hãy gọi_______54_______355 khi bạn đặt kích hoạt. Để đặt một cửa sổ loại bỏ các ô đã kích hoạt, hãy gọi ____54_______344 Để đặt một cửa sổ tích lũy các ô được tạo khi kích hoạt kích hoạt, hãy đặt tham số 349 thành 358 khi bạn đặt kích hoạt. Để đặt một cửa sổ loại bỏ các ô đã nung, hãy đặt 349 thành 360Để đặt một cửa sổ tích lũy các ô được tạo khi trình kích hoạt kích hoạt, hãy đặt tham số 354 thành 362 khi bạn đặt kích hoạt. Để đặt một cửa sổ loại bỏ các tấm đã nung, hãy đặt 354 thành 364Hãy xem một ví dụ sử dụng 73 với cửa sổ thời gian cố định và trình kích hoạt dựa trên dữ liệu. Đây là điều bạn có thể làm nếu, ví dụ: mỗi cửa sổ biểu thị giá trị trung bình đang chạy trong mười phút, nhưng bạn muốn hiển thị giá trị hiện tại của giá trị trung bình trong giao diện người dùng thường xuyên hơn mười phút một lần. Chúng tôi sẽ giả định các điều kiện sau
Sơ đồ sau hiển thị các sự kiện dữ liệu cho khóa X khi chúng đến PCollection và được gán cho các cửa sổ. Để sơ đồ đơn giản hơn một chút, chúng ta sẽ giả sử rằng tất cả các sự kiện đều đến trong quy trình theo thứ tự 9. 4. 1. 1. chế độ tích lũyIf our trigger is set to accumulating mode, the trigger emits the following values each time it fires. Hãy nhớ rằng trình kích hoạt sẽ kích hoạt mỗi khi ba yếu tố đến 489. 4. 1. 2. chế độ loại bỏNếu trình kích hoạt của chúng tôi được đặt ở chế độ loại bỏ, trình kích hoạt sẽ phát ra các giá trị sau trên mỗi lần kích hoạt 499. 4. 2. Xử lý dữ liệu muộnNếu bạn muốn quy trình của mình xử lý dữ liệu đến sau khi hình mờ đi qua cuối cửa sổ, bạn có thể áp dụng độ trễ cho phép khi đặt cấu hình cửa sổ của mình. Điều này giúp trình kích hoạt của bạn có cơ hội phản ứng với dữ liệu muộn. Nếu độ trễ cho phép được đặt, trình kích hoạt mặc định sẽ đưa ra kết quả mới ngay lập tức bất cứ khi nào có dữ liệu trễ Bạn đặt độ trễ cho phép bằng cách sử dụng ______54_______368 50 51 52Độ trễ cho phép này lan truyền tới tất cả các 73 được tạo ra do áp dụng các phép biến đổi cho 73 ban đầu. Nếu bạn muốn thay đổi độ trễ được phép sau này trong quy trình bán hàng của mình, bạn có thể đăng ký 311 369 370 again, explicitly.9. 5. kích hoạt tổng hợpBạn có thể kết hợp nhiều trình kích hoạt để tạo thành trình kích hoạt tổng hợp và có thể chỉ định trình kích hoạt để phát ra kết quả nhiều lần, nhiều nhất một lần hoặc trong các điều kiện tùy chỉnh khác 9. 5. 1. Các loại kích hoạt tổng hợpBeam includes the following composite triggers
9. 5. 2. Bố cục với AfterWatermarkMột số trình kích hoạt tổng hợp hữu ích nhất kích hoạt một lần duy nhất khi Beam ước tính rằng tất cả dữ liệu đã đến (i. e. khi hình mờ đi qua cuối cửa sổ) kết hợp với một trong hai hoặc cả hai điều sau
Bạn có thể diễn đạt mẫu này bằng cách sử dụng 320. Ví dụ: mã kích hoạt ví dụ sau kích hoạt trong các điều kiện sau
53 54 559. 5. 3. Các kích hoạt tổng hợp khácBạn cũng có thể xây dựng các loại trình kích hoạt tổng hợp khác. Mã ví dụ sau hiển thị trình kích hoạt tổng hợp đơn giản kích hoạt bất cứ khi nào ngăn có ít nhất 100 phần tử hoặc sau một phút 56 5710. số liệuTrong mô hình Beam, các chỉ số cung cấp một số thông tin chi tiết về trạng thái hiện tại của quy trình người dùng, có thể xảy ra trong khi quy trình đang chạy. Có thể có những lý do khác nhau cho điều đó, ví dụ
10. 1. Các khái niệm chính về chỉ số Beam
Các chỉ số được báo cáo hoàn toàn nằm trong phạm vi biến đổi trong quy trình đã báo cáo các chỉ số đó. Điều này cho phép báo cáo cùng một tên chỉ số ở nhiều nơi và xác định giá trị mà mỗi biến đổi được báo cáo, cũng như tổng hợp chỉ số trên toàn bộ quy trình
10. 2. Các loại chỉ sốCó ba loại số liệu được hỗ trợ tại thời điểm này. 387, 388 và 389Trong Beam SDK for Go, một 10 do khung cung cấp phải được chuyển đến chỉ số, nếu không giá trị chỉ số sẽ không được ghi lại. Khung sẽ tự động cung cấp một ____73_______10 hợp lệ cho ____65_______36 và các phương thức tương tự khi đó là tham số đầu tiênPhản đối. Số liệu báo cáo một giá trị dài duy nhất và có thể tăng hoặc giảm 58 59Phân bổ. Một số liệu báo cáo thông tin về việc phân phối các giá trị được báo cáo 60 61máy đo. Số liệu báo cáo giá trị mới nhất trong số các giá trị được báo cáo. Vì các số liệu được thu thập từ nhiều công nhân nên giá trị có thể không phải là giá trị cuối cùng tuyệt đối mà là một trong những giá trị mới nhất 62 6310. 3. Số liệu truy vấn 393 có một phương thức 394 trả về một đối tượng 395 cho phép truy cập các chỉ số. Phương pháp chính có sẵn trong 395 cho phép truy vấn tất cả các chỉ số phù hợp với một bộ lọc nhất định 397 có phương thức 398 trả về đối tượng 399 cho phép truy cập số liệu. Phương pháp chính có sẵn trong 399 cho phép truy vấn tất cả các chỉ số phù hợp với một bộ lọc nhất định. Nó nhận một vị ngữ có loại tham số 401, có thể được sử dụng cho các bộ lọc tùy chỉnh 64 6510. 4. Sử dụng số liệu trong quy trìnhBên dưới, có một ví dụ đơn giản về cách sử dụng chỉ số 387 trong quy trình người dùng 66 6710. 5. Xuất chỉ sốSố liệu chùm tia có thể được xuất sang các phần chìm bên ngoài. Nếu một số liệu chìm được thiết lập trong cấu hình, người chạy sẽ đẩy các số liệu tới nó trong khoảng thời gian 5 giây mặc định. Cấu hình được giữ trong lớp MetricsOptions. Nó chứa cấu hình thời gian đẩy và cũng có các tùy chọn cụ thể chìm như loại và URL. Hiện tại, chỉ có REST HTTP và các phần chìm Graphite được hỗ trợ và chỉ các trình chạy Flink và Spark hỗ trợ xuất số liệu Ngoài ra, các số liệu Beam được xuất sang bảng điều khiển Spark và Flink bên trong để được tư vấn trong giao diện người dùng tương ứng của chúng 11. Trạng thái và bộ hẹn giờCác cơ sở kích hoạt và cửa sổ của Beam cung cấp khả năng trừu tượng hóa mạnh mẽ để nhóm và tổng hợp dữ liệu đầu vào không giới hạn dựa trên dấu thời gian. Tuy nhiên, có những trường hợp sử dụng tổng hợp mà các nhà phát triển có thể yêu cầu mức độ kiểm soát cao hơn mức độ kiểm soát được cung cấp bởi các cửa sổ và trình kích hoạt. Beam cung cấp API để quản lý trạng thái theo từng khóa theo cách thủ công, cho phép kiểm soát chi tiết các tập hợp Trạng thái mô hình API trạng thái của Beam trên mỗi khóa. To use the state API, you start out with a keyed 73, which in Java is modeled as a 10. A 56 processing this 73 can now declare state variables. Bên trong 56, các biến trạng thái này có thể được sử dụng để viết hoặc cập nhật trạng thái cho khóa hiện tại hoặc để đọc trạng thái trước đó được viết cho khóa đó. Trạng thái luôn chỉ nằm trong phạm vi đầy đủ đối với khóa xử lý hiện tạiCửa sổ vẫn có thể được sử dụng cùng với xử lý trạng thái. Tất cả trạng thái cho một khóa nằm trong phạm vi cửa sổ hiện tại. Điều này có nghĩa là lần đầu tiên một khóa được nhìn thấy cho một cửa sổ nhất định, bất kỳ trạng thái nào được đọc sẽ trả về trạng thái trống và trình chạy có thể thu gom rác trạng thái khi một cửa sổ hoàn thành. Nó cũng thường hữu ích khi sử dụng các tập hợp cửa sổ của Beam trước toán tử trạng thái. Ví dụ: sử dụng bộ kết hợp để tổng hợp trước dữ liệu, sau đó lưu trữ dữ liệu đã tổng hợp bên trong trạng thái. Các cửa sổ hợp nhất hiện không được hỗ trợ khi sử dụng trạng thái và bộ hẹn giờ Đôi khi xử lý trạng thái được sử dụng để triển khai xử lý kiểu máy trạng thái bên trong một 83. Khi thực hiện việc này, cần phải lưu ý rằng các thành phần trong PCCollection đầu vào không có thứ tự được đảm bảo và để đảm bảo rằng logic chương trình có khả năng phục hồi với điều này. Các bài kiểm tra đơn vị được viết bằng DirectRunner sẽ xáo trộn thứ tự xử lý phần tử và được khuyến nghị để kiểm tra tính chính xácTrong Java, DoFn tuyên bố các trạng thái sẽ được truy cập bằng cách tạo các biến thành viên 409 cuối cùng đại diện cho mỗi trạng thái. Mỗi tiểu bang phải được đặt tên bằng cách sử dụng chú thích 410; . Một 83 có thể khai báo nhiều biến trạng tháiTrong Python, DoFn khai báo các trạng thái được truy cập bằng cách tạo các biến thành viên lớp 409 đại diện cho từng trạng thái. Mỗi 409 được khởi tạo với một tên, tên này là duy nhất cho một ParDo trong biểu đồ và không liên quan đến các nút khác trong biểu đồ. A 83 can declare multiple state variablesTrong Go, DoFn khai báo các trạng thái sẽ được truy cập bằng cách tạo các biến thành viên cấu trúc trạng thái đại diện cho từng trạng thái. Mỗi biến trạng thái được khởi tạo bằng một khóa, khóa này là duy nhất cho một ParDo trong biểu đồ và không liên quan đến các nút khác trong biểu đồ. Nếu không cung cấp tên, khóa sẽ mặc định là tên của biến thành viên. Một 83 có thể khai báo nhiều biến trạng thái
11. 1. Các loại trạng tháiBeam cung cấp một số loại trạng thái ValueStateValueState là một giá trị trạng thái vô hướng. Đối với mỗi khóa trong đầu vào, một ValueState sẽ lưu trữ một giá trị đã nhập có thể được đọc và sửa đổi bên trong các phương thức 12 hoặc 41 của DoFn. Nếu loại ValueState có bộ mã hóa được đăng ký, thì Beam sẽ tự động suy ra bộ mã hóa cho giá trị trạng thái. Mặt khác, một bộ mã hóa có thể được chỉ định rõ ràng khi tạo ValueState. Ví dụ: ParDo sau đây tạo một biến trạng thái duy nhất tích lũy số lượng phần tử được nhìn thấyGhi chú. 418 được gọi là 419 trong Python SDK 68 69Beam cũng cho phép chỉ định rõ ràng một bộ mã hóa cho các giá trị 418. For example 70 71 72 73Kết hợpTrạng thái 421 cho phép bạn tạo một đối tượng trạng thái được cập nhật bằng bộ kết hợp Beam. Ví dụ: ví dụ về 418 trước đó có thể được viết lại để sử dụng 421 74 75 76TúiBangMột trường hợp sử dụng phổ biến cho trạng thái là tích lũy nhiều phần tử. 424 cho phép tích lũy một tập hợp các phần tử không có thứ tự. Điều này cho phép thêm các phần tử vào bộ sưu tập mà không yêu cầu đọc toàn bộ bộ sưu tập trước, đây là một hiệu quả đạt được. Ngoài ra, các vận động viên hỗ trợ đọc theo trang có thể cho phép các túi riêng lẻ lớn hơn bộ nhớ khả dụng 77 78 3511. 2. Đọc trạng thái hoãn lạiKhi một 83 chứa nhiều thông số trạng thái, việc đọc từng thông số theo thứ tự có thể chậm. Gọi hàm 426 ở trạng thái có thể khiến người chạy thực hiện thao tác chặn đọc. Thực hiện nhiều lần đọc chặn theo trình tự sẽ thêm độ trễ cho quá trình xử lý phần tử. Nếu bạn biết rằng một trạng thái sẽ luôn được đọc, bạn có thể chú thích trạng thái đó là @AlwaysFetched, sau đó trình chạy có thể tìm nạp trước tất cả các trạng thái cần thiết. Ví dụ 80 81 82Tuy nhiên, nếu có các đường dẫn mã trong đó các trạng thái không được tìm nạp, thì việc chú thích bằng @AlwaysFetched sẽ thêm tính năng tìm nạp không cần thiết cho các đường dẫn đó. Trong trường hợp này, phương thức readLater cho phép người chạy biết rằng trạng thái sẽ được đọc trong tương lai, cho phép nhiều lần đọc trạng thái được gộp lại với nhau 8311. 3. hẹn giờBeam cung cấp API gọi lại hẹn giờ cho mỗi phím. Điều này cho phép xử lý chậm dữ liệu được lưu trữ bằng API trạng thái. Bộ hẹn giờ có thể được đặt thành gọi lại tại thời điểm sự kiện hoặc dấu thời gian xử lý. Mỗi bộ đếm thời gian được xác định bằng TimerId. Chỉ có thể đặt bộ hẹn giờ nhất định cho một phím cho một dấu thời gian. Cuộc gọi được đặt trên bộ hẹn giờ sẽ ghi đè thời gian kích hoạt trước đó cho bộ hẹn giờ của phím đó 11. 3. 1. bộ hẹn giờ sự kiệnBộ hẹn giờ thời gian sự kiện kích hoạt khi hình mờ đầu vào cho DoFn vượt qua thời gian mà bộ hẹn giờ được đặt, nghĩa là người chạy tin rằng không có thêm yếu tố nào được xử lý bằng dấu thời gian trước dấu thời gian của bộ hẹn giờ. Điều này cho phép tổng hợp thời gian sự kiện 84 85 8611. 3. 2. bộ định thời gian xử lýBộ hẹn giờ thời gian xử lý kích hoạt khi thời gian thực của đồng hồ treo tường trôi qua. Điều này thường được sử dụng để tạo các lô dữ liệu lớn hơn trước khi xử lý. Nó cũng có thể được sử dụng để lên lịch các sự kiện sẽ xảy ra vào một thời điểm cụ thể. Just like with event-time timers, processing-time timers are per key - each key has a separate copy of the timer Mặc dù bộ hẹn giờ thời gian xử lý có thể được đặt thành dấu thời gian tuyệt đối, nhưng rất phổ biến để đặt chúng thành phần bù so với thời gian hiện tại. Trong Java, các phương thức 427 và 428 có thể được sử dụng để thực hiện điều này 87 88 8611. 3. 3. Thẻ hẹn giờ độngBeam cũng hỗ trợ tự động thiết lập thẻ hẹn giờ bằng cách sử dụng 429 trong SDK Java. Điều này cho phép đặt nhiều bộ hẹn giờ khác nhau trong một 83 và cho phép các thẻ bộ hẹn giờ được chọn động - e. g. dựa trên dữ liệu trong các yếu tố đầu vào. Chỉ có thể đặt bộ hẹn giờ với một thẻ cụ thể thành một dấu thời gian duy nhất, do đó, việc đặt lại bộ hẹn giờ có tác dụng ghi đè thời gian hết hạn trước đó cho bộ hẹn giờ với thẻ đó. Mỗi 429 được xác định bằng id họ bộ hẹn giờ và các bộ hẹn giờ trong các họ bộ hẹn giờ khác nhau là độc lậpTrong SDK Python, thẻ hẹn giờ động có thể được chỉ định trong khi gọi 432 hoặc 433. Theo mặc định, thẻ hẹn giờ là một chuỗi rỗng nếu không được chỉ định 90 91 8611. 3. 4. Dấu thời gian đầu ra của bộ hẹn giờTheo mặc định, bộ hẹn giờ theo thời gian sự kiện sẽ giữ hình mờ đầu ra của 56 cho dấu thời gian của bộ hẹn giờ. Điều này có nghĩa là nếu bộ hẹn giờ được đặt thành 12 giờ đêm, thì bất kỳ tập hợp cửa sổ hoặc bộ hẹn giờ sự kiện nào sau này trong biểu đồ quy trình kết thúc sau 12 giờ đêm sẽ không hết hạn. Dấu thời gian của bộ hẹn giờ cũng là dấu thời gian đầu ra mặc định cho lệnh gọi lại bộ hẹn giờ. This means that any elements output from the onTimer method will have a timestamp equal to the timestamp of the timer firing. Đối với bộ hẹn giờ thời gian xử lý, dấu thời gian đầu ra mặc định và giữ hình mờ là giá trị của hình mờ đầu vào tại thời điểm bộ hẹn giờ được đặtTrong một số trường hợp, DoFn cần xuất dấu thời gian sớm hơn thời gian hết hạn của bộ hẹn giờ và do đó cũng cần giữ hình mờ đầu ra của nó đối với các dấu thời gian đó. Ví dụ: hãy xem xét quy trình sau tạm thời nhóm các bản ghi vào trạng thái và đặt bộ hẹn giờ để thoát trạng thái. Mã này có thể xuất hiện đúng, nhưng sẽ không hoạt động bình thường 93Vấn đề với mã này là ParDo đang đệm các phần tử, tuy nhiên không có gì ngăn hình mờ vượt qua dấu thời gian của các phần tử đó, vì vậy tất cả các phần tử đó có thể bị loại bỏ dưới dạng dữ liệu muộn. Để ngăn điều này xảy ra, dấu thời gian đầu ra cần được đặt trên bộ hẹn giờ để ngăn hình mờ vượt qua dấu thời gian của phần tử tối thiểu. Đoạn mã sau chứng minh điều này 94 95 8611. 4. Tình trạng thu gom rácTrạng thái trên mỗi khóa cần được thu gom rác hoặc cuối cùng kích thước ngày càng tăng của trạng thái có thể tác động tiêu cực đến hiệu suất. Có hai chiến lược phổ biến cho trạng thái thu gom rác 11. 4. 1. Sử dụng cửa sổ để thu gom rácTất cả trạng thái và bộ hẹn giờ cho một khóa nằm trong phạm vi cửa sổ chứa khóa đó. Điều này có nghĩa là tùy thuộc vào dấu thời gian của phần tử đầu vào, ParDo sẽ thấy các giá trị khác nhau cho trạng thái tùy thuộc vào cửa sổ mà phần tử đó rơi vào. Ngoài ra, khi hình mờ đầu vào đi qua cuối cửa sổ, trình chạy sẽ thu thập tất cả trạng thái cho cửa sổ đó. (note. if allowed lateness is set to a positive value for the window, the runner must wait for the watermark to pass the end of the window plus the allowed lateness before garbage collecting state). Điều này có thể được sử dụng như một chiến lược thu gom rác Ví dụ, được đưa ra sau đây 97 98 99Trạng thái cửa hàng 56 này mỗi ngày. Khi đường ống xử lý xong dữ liệu cho một ngày nhất định, tất cả trạng thái của ngày đó sẽ được thu gom rác11. 4. 1. Sử dụng bộ hẹn giờ để thu gom rácTrong một số trường hợp, rất khó để tìm ra chiến lược cửa sổ mô hình hóa chiến lược thu gom rác mong muốn. Ví dụ: mong muốn chung là chuyển trạng thái thu gom rác cho một khóa sau khi không thấy hoạt động nào trên khóa trong một thời gian. Điều này có thể được thực hiện bằng cách cập nhật bộ đếm thời gian mà trạng thái thu gom rác. Ví dụ 00 01 8611. 5. Ví dụ về trạng thái và bộ hẹn giờSau đây là một số ví dụ sử dụng trạng thái và bộ hẹn giờ 11. 5. 1. Kết hợp các nhấp chuột và lượt xemTrong ví dụ này, quy trình đang xử lý dữ liệu từ trang chủ của trang thương mại điện tử. There are two input streams. luồng lượt xem, biểu thị các liên kết sản phẩm được đề xuất hiển thị cho người dùng trên trang chủ và luồng lượt nhấp, biểu thị lượt nhấp thực tế của người dùng vào các liên kết này. Mục tiêu của quy trình là kết hợp các sự kiện nhấp chuột với các sự kiện xem, xuất ra một sự kiện đã tham gia mới chứa thông tin từ cả hai sự kiện. Mỗi liên kết có một mã định danh duy nhất có trong cả sự kiện xem và sự kiện tham gia Nhiều sự kiện xem sẽ không bao giờ được theo dõi bằng các nhấp chuột. Đường ống này sẽ đợi một giờ cho một nhấp chuột, sau đó nó sẽ từ bỏ tham gia này. Mặc dù mỗi sự kiện nhấp chuột phải có một sự kiện xem, nhưng một số ít sự kiện xem có thể bị mất và không bao giờ được đưa vào quy trình Beam; . Các sự kiện đầu vào không được sắp xếp - có thể thấy sự kiện nhấp chuột trước sự kiện xem. Thời gian chờ tham gia một giờ phải dựa trên thời gian sự kiện, không phải thời gian xử lý 03 0411. 5. 2. RPC theo đợtTrong ví dụ này, các yếu tố đầu vào đang được chuyển tiếp đến một dịch vụ RPC bên ngoài. RPC chấp nhận các yêu cầu hàng loạt - nhiều sự kiện cho cùng một người dùng có thể được xử lý theo đợt trong một lệnh gọi RPC. Vì dịch vụ RPC này cũng áp đặt giới hạn tốc độ nên chúng tôi muốn gộp các sự kiện có giá trị 10 giây lại với nhau để giảm số lượng cuộc gọi 05 0612. Có thể chia tách await beam.createRunner().run(function pipeline(root) { // Use root to build a pipeline. }); 67Một 83 có thể chia được (SDF) cho phép người dùng tạo các thành phần mô-đun có chứa I/O (và một số ) nâng cao. Having modular I/O components that can be connected to each other simplify typical patterns that users want. Ví dụ: trường hợp sử dụng phổ biến là đọc tên tệp từ hàng đợi tin nhắn, sau đó phân tích cú pháp các tệp đó. Theo truyền thống, người dùng được yêu cầu viết một trình kết nối I/O duy nhất chứa logic cho hàng đợi tin nhắn và trình đọc tệp (độ phức tạp tăng lên) hoặc chọn sử dụng lại I/O hàng đợi tin nhắn theo sau bởi một 83 thông thường đọc tệp ( . Với SDF, chúng tôi mang đến sự phong phú của các API I/O của Apache Beam thành một mô-đun cho phép 83 trong khi vẫn duy trì hiệu suất của các đầu nối I/O truyền thống12. 1. Khái niệm cơ bản về SDFỞ cấp độ cao, SDF chịu trách nhiệm xử lý các cặp phần tử và giới hạn. Một hạn chế đại diện cho một tập hợp con công việc cần thiết phải được thực hiện khi xử lý phần tử Thực hiện một SDF theo các bước sau
12. 1. 1. Một SDF cơ bảnA basic SDF is composed of three parts. hạn chế, nhà cung cấp hạn chế và trình theo dõi hạn chế. Nếu bạn muốn kiểm soát hình mờ, đặc biệt là trong đường ống phát trực tuyến, cần có thêm hai thành phần. nhà cung cấp công cụ ước tính thủy vân và công cụ ước tính thủy vân Hạn chế là một đối tượng do người dùng định nghĩa được sử dụng để biểu thị một tập hợp con công việc cho một phần tử nhất định. Ví dụ: chúng tôi đã xác định 440 là hạn chế để biểu thị các vị trí bù trong Java vàNhà cung cấp hạn chế cho phép các tác giả SDF ghi đè các triển khai mặc định, bao gồm cả các triển khai để phân tách và định cỡ. Trong Java và , đây là 83. có một loại 442 chuyên dụngTrình theo dõi hạn chế chịu trách nhiệm theo dõi tập hợp con nào của hạn chế đã được hoàn thành trong quá trình xử lý. Để biết chi tiết về API, hãy đọc tài liệu tham khảo và Java Có một số triển khai 443 tích hợp được định nghĩa trong Java
SDF cũng có triển khai 443 tích hợp sẵn trong PythonGo cũng có loại 443 tích hợp sẵn
Trạng thái hình mờ là một đối tượng do người dùng xác định được sử dụng để tạo một 446 từ một 447. Trạng thái hình mờ đơn giản nhất có thể là một 448Nhà cung cấp công cụ ước tính hình mờ cho phép các tác giả SDF xác định cách khởi tạo trạng thái hình mờ và tạo công cụ ước tính hình mờ. In Java and this is the 83. có một loại 447 chuyên dụngCông cụ ước tính thủy vân theo dõi thủy vân khi một cặp hạn chế phần tử đang được tiến hành. Để biết chi tiết về API, hãy đọc tài liệu tham khảo về Java, , và Có một số triển khai 446 tích hợp sẵn trong Java
Cùng với 447 mặc định, có cùng một bộ triển khai 446 tích hợp sẵn trong PythonCác loại 446 sau đây được triển khai trong GoĐể xác định SDF, bạn phải chọn xem SDF có giới hạn (mặc định) hay không giới hạn và xác định cách khởi tạo giới hạn ban đầu cho một phần tử. Sự phân biệt dựa trên cách thể hiện khối lượng công việc
Trong Java, bạn có thể sử dụng @UnboundedPerElement hoặc @BoundedPerElement để chú thích 83 của mình. Trong Python, bạn có thể sử dụng để chú thích 83 07 08 09Tại thời điểm này, chúng tôi có một SDF hỗ trợ cho phép tái cân bằng công việc động. Để tăng tốc độ xảy ra song song hóa công việc ban đầu hoặc đối với những người chạy không hỗ trợ phân tách do người chạy bắt đầu, chúng tôi khuyên bạn nên cung cấp một tập hợp các lần phân tách ban đầu 10 11 1212. 2. Định cỡ và tiến độĐịnh cỡ và tiến độ được sử dụng trong quá trình thực thi SDF để thông báo cho người chạy để họ có thể thực hiện các quyết định thông minh về việc phân chia những hạn chế nào và cách song song hóa công việc Trước khi xử lý một phần tử và hạn chế, người chạy có thể sử dụng kích thước ban đầu để chọn cách thức và người xử lý các hạn chế nhằm cải thiện sự cân bằng ban đầu và song song hóa công việc. Trong quá trình xử lý một phần tử và hạn chế, định cỡ và tiến trình được sử dụng để chọn phân chia hạn chế nào và ai sẽ xử lý chúng Theo mặc định, chúng tôi sử dụng ước tính của trình theo dõi hạn chế cho công việc còn lại dựa trên giả định rằng tất cả các hạn chế đều có chi phí bằng nhau. Để ghi đè mặc định, tác giả SDF có thể cung cấp phương pháp thích hợp trong nhà cung cấp hạn chế. Các tác giả SDF cần lưu ý rằng phương pháp định cỡ sẽ được gọi đồng thời trong quá trình xử lý gói do quá trình phân tách và ước tính tiến độ bắt đầu của người chạy 13 14 1512. 3. Điểm kiểm tra do người dùng khởi tạoMột số I/O không thể tạo ra tất cả dữ liệu cần thiết để hoàn thành một hạn chế trong thời gian tồn tại của một gói. Điều này thường xảy ra với các hạn chế không giới hạn, nhưng cũng có thể xảy ra với các hạn chế có giới hạn. For example, there could be more data that needs to be ingested but is not available yet. Một nguyên nhân khác của trường hợp này là do hệ thống nguồn điều chỉnh dữ liệu của bạn SDF của bạn có thể báo hiệu cho bạn biết rằng bạn chưa xử lý xong hạn chế hiện tại. Tín hiệu này có thể gợi ý thời gian để tiếp tục tại. Trong khi người chạy cố gắng tôn trọng thời gian tiếp tục, điều này không được đảm bảo. Điều này cho phép thực thi tiếp tục trên một hạn chế có sẵn công việc cải thiện việc sử dụng tài nguyên 16 17 1812. 4. Runner-khởi xướng táchNgười chạy bất cứ lúc nào cũng có thể cố gắng tách một giới hạn trong khi nó đang được xử lý. Điều này cho phép người chạy tạm dừng xử lý hạn chế để có thể thực hiện công việc khác (phổ biến đối với các hạn chế không giới hạn để hạn chế lượng đầu ra và/hoặc cải thiện độ trễ) hoặc chia hạn chế thành hai phần, tăng khả năng song song có sẵn trong hệ thống. vận động viên khác nhau (e. g. , Dataflow, Flink, Spark) có các chiến lược khác nhau để phân tách theo đợt và thực thi trực tuyến Hãy ghi nhớ điều này khi viết một SDF vì thời điểm kết thúc hạn chế có thể thay đổi. Khi viết vòng lặp xử lý, hãy sử dụng kết quả từ việc cố gắng yêu cầu một phần của hạn chế thay vì cho rằng bạn có thể xử lý cho đến khi kết thúc Một ví dụ không chính xác có thể là 19 20 2112. 5. ước tính hình mờCông cụ ước tính hình mờ mặc định không tạo ra ước tính hình mờ. Do đó, hình mờ đầu ra chỉ được tính bằng số lượng hình mờ ngược dòng tối thiểu Một SDF có thể nâng cao hình mờ đầu ra bằng cách chỉ định giới hạn dưới cho tất cả đầu ra trong tương lai mà phần tử này và cặp hạn chế sẽ tạo ra. Người chạy tính toán hình mờ đầu ra tối thiểu bằng cách lấy giá trị tối thiểu trên tất cả các hình mờ ngược dòng và giá trị tối thiểu được báo cáo bởi từng phần tử và cặp giới hạn. Hình mờ được báo cáo phải tăng đơn điệu cho từng phần tử và cặp hạn chế trên các ranh giới gói. Khi một phần tử và cặp hạn chế ngừng xử lý hình mờ của nó, nó không còn được coi là một phần của phép tính trên Lời khuyên
12. 5. 1. Kiểm soát hình mờCó hai loại công cụ ước tính thủy vân chung. quan sát dấu thời gian và quan sát đồng hồ bên ngoài. Công cụ ước tính hình mờ quan sát dấu thời gian sử dụng dấu thời gian đầu ra của mỗi bản ghi để tính toán ước tính hình mờ trong khi công cụ ước tính hình mờ quan sát đồng hồ bên ngoài kiểm soát hình mờ bằng cách sử dụng đồng hồ không được liên kết với bất kỳ đầu ra riêng lẻ nào, chẳng hạn như đồng hồ cục bộ của máy hoặc đồng hồ tiếp xúc Nhà cung cấp công cụ ước tính hình mờ cho phép bạn ghi đè logic ước tính hình mờ mặc định và sử dụng triển khai công cụ ước tính hình mờ hiện có. Bạn cũng có thể cung cấp triển khai ước tính hình mờ của riêng mình 22 23 2412. 6. Cắt ngắn trong quá trình thoát nướcNgười chạy hỗ trợ đường ống thoát nước cần có khả năng thoát SDF; . Theo mặc định, các hạn chế có giới hạn xử lý phần còn lại của hạn chế trong khi các hạn chế không có giới hạn kết thúc quá trình xử lý tại điểm kiểm tra do SDF bắt đầu tiếp theo hoặc phân tách do người chạy bắt đầu. Bạn có thể ghi đè hành vi mặc định này bằng cách xác định phương thức thích hợp trên trình cung cấp hạn chế Ghi chú. Sau khi quá trình thoát đường ống bắt đầu và biến đổi hạn chế cắt ngắn được kích hoạt, 457 sẽ không được lên lịch lại 25 26 2712. 7. Quyết toán góiHoàn thiện gói cho phép một 83 thực hiện các tác dụng phụ bằng cách đăng ký gọi lại. Cuộc gọi lại được gọi sau khi người chạy đã thừa nhận rằng nó đã duy trì đầu ra một cách lâu dài. Ví dụ: một hàng đợi tin nhắn có thể cần xác nhận các tin nhắn mà nó đã nhập vào đường dẫn. Quyết toán gói không giới hạn ở SDF nhưng được gọi ra ở đây vì đây là trường hợp sử dụng chính 28 29 3013. Đường ống đa ngôn ngữPhần này cung cấp tài liệu toàn diện về các đường dẫn đa ngôn ngữ. Để bắt đầu tạo một kênh dẫn đa ngôn ngữ, hãy xem
Beam cho phép bạn kết hợp các biến đổi được viết bằng bất kỳ ngôn ngữ SDK được hỗ trợ nào (hiện tại là Java và Python) và sử dụng chúng trong một kênh dẫn đa ngôn ngữ. Khả năng này giúp dễ dàng cung cấp chức năng mới đồng thời trong các Apache Beam SDK khác nhau thông qua một chuyển đổi ngôn ngữ chéo duy nhất. Ví dụ: trình kết nối Apache Kafka và chuyển đổi SQL từ SDK Java có thể được sử dụng trong các đường dẫn Python Các quy trình sử dụng biến đổi từ nhiều ngôn ngữ SDK được gọi là quy trình đa ngôn ngữ 13. 1. Tạo chuyển đổi ngôn ngữ chéoĐể cung cấp các biến đổi được viết bằng một ngôn ngữ cho các quy trình được viết bằng ngôn ngữ khác, Beam sử dụng một dịch vụ mở rộng, dịch vụ này tạo và đưa các đoạn quy trình dành riêng cho ngôn ngữ thích hợp vào quy trình Trong ví dụ sau, một đường dẫn Beam Python khởi động một dịch vụ mở rộng Java cục bộ để tạo và thêm các đoạn đường dẫn Java thích hợp để thực thi chuyển đổi ngôn ngữ chéo Java Kafka thành đường dẫn Python. SDK sau đó tải xuống và tạo các giai đoạn phụ thuộc Java cần thiết để thực hiện các biến đổi này Khi chạy, trình chạy Beam sẽ thực thi cả hai phép biến đổi Python và Java để chạy đường dẫn Trong phần này, chúng tôi sẽ sử dụng KafkaIO. Đọc để minh họa cách tạo chuyển đổi ngôn ngữ chéo cho Java và ví dụ thử nghiệm cho Python 13. 1. 1. Tạo các biến đổi Java đa ngôn ngữThere are two ways to make Java transforms available to other SDKs
13. 1. 1. 1 Sử dụng các biến đổi Java hiện có mà không cần viết thêm mã JavaBắt đầu với Beam 2. 34. 0, người dùng SDK Python có thể sử dụng một số biến đổi Java mà không cần viết mã Java bổ sung. Điều này có thể hữu ích trong nhiều trường hợp. Ví dụ
Để đủ điều kiện sử dụng trực tiếp, API của biến đổi Java phải đáp ứng các yêu cầu sau
Đây là một lớp Java mẫu có thể được sử dụng trực tiếp từ API Python 31Để biết ví dụ hoàn chỉnh, hãy xem JavaDataGenerator Để sử dụng một lớp Java tuân thủ các yêu cầu trên từ đường dẫn SDK Python, hãy làm theo các bước sau
Bắt đầu với Beam 2. 36. 0, có thể bỏ qua bước 1 và 2, như được mô tả trong các phần tương ứng bên dưới Bước 1 Để sử dụng biến đổi Java đủ điều kiện từ Python, hãy xác định danh sách cho phép yaml. Danh sách cho phép này liệt kê tên lớp, phương thức hàm tạo và phương thức trình tạo có sẵn trực tiếp để sử dụng từ phía Python Bắt đầu với Beam 2. 35. 0, bạn có tùy chọn chuyển 006 sang tùy chọn 459 thay vì xác định danh sách cho phép thực tế. 006 chỉ định rằng tất cả các biến đổi được hỗ trợ trong đường dẫn lớp của dịch vụ mở rộng có thể được truy cập thông qua API. Chúng tôi khuyến khích sử dụng danh sách cho phép thực tế để sản xuất, vì việc cho phép khách hàng truy cập các lớp Java tùy ý có thể gây ra rủi ro bảo mật 32Bước 2 Cung cấp danh sách cho phép làm đối số khi khởi động dịch vụ mở rộng Java. Ví dụ: bạn có thể bắt đầu dịch vụ mở rộng dưới dạng quy trình Java cục bộ bằng cách sử dụng lệnh sau 33Bắt đầu với Beam 2. 36. 0, API 463 sẽ tự động khởi động dịch vụ mở rộng với phần phụ thuộc tệp 464 nhất định nếu địa chỉ dịch vụ mở rộng không được cung cấpBước 3 Bạn có thể sử dụng lớp Java trực tiếp từ đường dẫn Python của mình bằng cách sử dụng một biến đổi sơ khai được tạo từ API 463. API này cho phép bạn xây dựng biến đổi bằng cách sử dụng tên lớp Java và cho phép bạn gọi các phương thức của trình tạo để định cấu hình lớpCác loại tham số của hàm tạo và phương thức được ánh xạ giữa Python và Java bằng cách sử dụng lược đồ Beam. Lược đồ được tạo tự động bằng cách sử dụng các loại đối tượng được cung cấp ở phía Python. Nếu phương thức hàm tạo lớp Java hoặc phương thức trình tạo chấp nhận bất kỳ loại đối tượng phức tạp nào, hãy đảm bảo rằng lược đồ Beam cho các đối tượng này đã được đăng ký và sẵn dùng cho dịch vụ mở rộng Java. Nếu lược đồ chưa được đăng ký, dịch vụ mở rộng Java sẽ cố gắng đăng ký lược đồ bằng cách sử dụng. Trong Python, các đối tượng tùy ý có thể được biểu diễn bằng cách sử dụng các 052, sẽ được biểu diễn dưới dạng các hàng Beam trong lược đồ. Đây là một biến đổi sơ khai Python đại diện cho biến đổi Java đã đề cập ở trên 34Bạn có thể sử dụng biến đổi này trong một đường dẫn Python cùng với các biến đổi Python khác. Để biết ví dụ hoàn chỉnh, hãy xem javadatagenerator. py 13. 1. 1. 2 Sử dụng API để cung cấp các biến đổi Java hiện có cho các SDK khácĐể làm cho Beam Java SDK của bạn có thể chuyển đổi linh hoạt trên các ngôn ngữ SDK, bạn phải triển khai hai giao diện. ExternalTransformBuilder and ExternalTransformRegistrar. The 467 interface constructs the cross-language transform using configuration values passed in from the pipeline, and the 468 interface registers the cross-language transform for use with the expansion serviceTriển khai các giao diện
Sau khi bạn đã triển khai các giao diện 467 và 468, biến đổi của bạn có thể được đăng ký và tạo thành công bởi dịch vụ mở rộng Java mặc địnhBắt đầu dịch vụ mở rộng You can use an expansion service with multiple transforms in the same pipeline. The Beam Java SDK provides a default expansion service for Java transforms. Bạn cũng có thể viết dịch vụ mở rộng của riêng mình, nhưng điều đó thường không cần thiết, vì vậy nó không được đề cập trong phần này Thực hiện các thao tác sau để khởi động trực tiếp dịch vụ mở rộng Java 37Dịch vụ mở rộng hiện đã sẵn sàng để phục vụ các biến đổi trên cổng được chỉ định Khi tạo trình bao bọc dành riêng cho SDK cho biến đổi của mình, bạn có thể sử dụng các tiện ích do SDK cung cấp để bắt đầu dịch vụ mở rộng. Ví dụ: SDK Python cung cấp các tiện ích và để khởi động dịch vụ mở rộng Java bằng tệp JAR bao gồm các phụ thuộc Nếu biến đổi của bạn yêu cầu các thư viện bên ngoài, bạn có thể đưa chúng vào bằng cách thêm chúng vào đường dẫn lớp của dịch vụ mở rộng. Sau khi chúng được đưa vào đường dẫn lớp, chúng sẽ được phân đoạn khi biến đổi của bạn được mở rộng bởi dịch vụ mở rộng Viết trình bao bọc dành riêng cho SDK Chuyển đổi Java ngôn ngữ chéo của bạn có thể được gọi thông qua lớp cấp thấp hơn trong một đường dẫn đa ngôn ngữ (như được mô tả trong phần tiếp theo); . Sự trừu tượng hóa cấp cao hơn này sẽ giúp các tác giả đường ống sử dụng biến đổi của bạn dễ dàng hơn Để tạo trình bao bọc SDK để sử dụng trong đường dẫn Python, hãy làm như sau
13. 1. 2. Tạo các biến đổi Python đa ngôn ngữBất kỳ biến đổi Python nào được xác định trong phạm vi của dịch vụ mở rộng đều có thể truy cập được bằng cách chỉ định tên đủ điều kiện của chúng. Ví dụ: bạn có thể sử dụng biến đổi 493 của Python trong một đường ống dẫn Java với tên đủ điều kiện của nó là 494 40
Ngoài ra, bạn có thể muốn tạo một mô-đun Python đăng ký một biến đổi Python hiện có dưới dạng một biến đổi ngôn ngữ chéo để sử dụng với dịch vụ mở rộng Python và gọi vào biến đổi hiện có đó để thực hiện thao tác dự định của nó. Một URN đã đăng ký có thể được sử dụng sau này trong yêu cầu mở rộng để chỉ ra mục tiêu mở rộng Xác định mô-đun Python
Bắt đầu dịch vụ mở rộng Một dịch vụ mở rộng có thể được sử dụng với nhiều biến đổi trong cùng một đường ống. Beam Python SDK cung cấp dịch vụ mở rộng mặc định để bạn sử dụng với các biến đổi Python của mình. Bạn có thể tự do viết dịch vụ mở rộng của riêng mình, nhưng điều đó thường không cần thiết, vì vậy nó không được đề cập trong phần này Thực hiện các bước sau để khởi động trực tiếp dịch vụ mở rộng Python mặc định
bao gồm các phụ thuộc Hiện tại, các phép biến đổi bên ngoài của Python bị giới hạn ở các phần phụ thuộc có sẵn trong bộ khai thác Beam SDK cốt lõi 13. 1. 3. Tạo biến đổi ngôn ngữ chéoGo hiện không hỗ trợ tạo biến đổi ngôn ngữ chéo, chỉ sử dụng biến đổi ngôn ngữ chéo từ các ngôn ngữ khác; 13. 1. 4. Định nghĩa một URNPhát triển chuyển đổi ngôn ngữ chéo liên quan đến việc xác định URN để đăng ký chuyển đổi với dịch vụ mở rộng. Trong phần này, chúng tôi cung cấp một quy ước để xác định các URN như vậy. Việc tuân theo quy ước này là tùy chọn nhưng nó sẽ đảm bảo rằng biến đổi của bạn sẽ không gặp xung đột khi đăng ký dịch vụ mở rộng cùng với các biến đổi do các nhà phát triển khác phát triển 13. 1. 4. 1. Lược đồMột URN nên bao gồm các thành phần sau
Chúng tôi cung cấp lược đồ từ quy ước URN ở dạng Backus–Naur tăng cường. Từ khóa trong chữ hoa là từ thông số URN 4813. 1. 4. 2. ví dụDưới đây chúng tôi đã đưa ra một số ví dụ về các lớp biến đổi và các URN tương ứng sẽ được sử dụng
13. 2. Sử dụng biến đổi ngôn ngữ chéoDepending on the SDK language of the pipeline, you can use a high-level SDK-wrapper class, or a low-level transform class to access a cross-language transform 13. 2. 1. Sử dụng các biến đổi ngôn ngữ chéo trong một đường dẫn JavaNgười dùng có ba tùy chọn để sử dụng chuyển đổi ngôn ngữ chéo trong đường dẫn Java. Ở mức độ trừu tượng cao nhất, một số biến đổi Python phổ biến có thể truy cập được thông qua các biến đổi trình bao bọc Java chuyên dụng. Ví dụ: SDK Java có lớp 506, lớp này sử dụng lớp 506 của SDK Python và lớp này có lớp 508, lớp này sử dụng lớp 508 của SDK Python, v.v. Khi biến đổi trình bao bọc dành riêng cho SDK không có sẵn cho biến đổi Python đích, bạn có thể sử dụng lớp PythonExternalTransform cấp thấp hơn thay vào đó bằng cách chỉ định tên đủ điều kiện của biến đổi Python. Nếu bạn muốn thử chuyển đổi bên ngoài từ SDK khác ngoài Python (bao gồm cả SDK Java), bạn cũng có thể sử dụng lớp Bên ngoài cấp thấp nhấtSử dụng trình bao bọc SDK Để sử dụng chuyển đổi ngôn ngữ chéo thông qua trình bao bọc SDK, hãy nhập mô-đun cho trình bao bọc SDK và gọi nó từ quy trình của bạn, như minh họa trong ví dụ 49Sử dụng lớp PythonExternalTransform Khi không có trình bao bọc dành riêng cho SDK, bạn có thể truy cập biến đổi đa ngôn ngữ Python thông qua lớp 495 bằng cách chỉ định tên đủ điều kiện và đối số hàm tạo của biến đổi Python đích 50Sử dụng lớp bên ngoài
13. 2. 2. Sử dụng các biến đổi ngôn ngữ chéo trong một đường dẫn PythonNếu có trình bao bọc dành riêng cho Python cho biến đổi ngôn ngữ chéo, hãy sử dụng trình bao bọc đó. Mặt khác, bạn phải sử dụng lớp cấp thấp hơn để truy cập biến đổi Sử dụng trình bao bọc SDK Để sử dụng chuyển đổi ngôn ngữ chéo thông qua trình bao bọc SDK, hãy nhập mô-đun cho trình bao bọc SDK và gọi nó từ quy trình của bạn, như minh họa trong ví dụ 51Sử dụng lớp ExternalTransform Khi không có trình bao bọc dành riêng cho SDK, bạn sẽ phải truy cập vào chuyển đổi ngôn ngữ chéo thông qua lớp 482
Using the JavaExternalTransform class Python has the ability to invoke Java-defined transforms via as if they were Python transforms. These are invoked as follows 53Python’s 517 method can be used if the method names in java are reserved Python keywords such as 518As with other external transforms, either a pre-started expansion service can be provided, or jar files that include the transform, its dependencies, and Beam’s expansion service in which case an expansion service will be auto-started 13. 2. 3. Using cross-language transforms in a Go pipelineIf a Go-specific wrapper for a cross-language is available, use that. Otherwise, you have to use the lower-level function to access the transform Expansion Services The Go SDK supports automatically starting Java expansion services if an expansion address is not provided, although this is slower than providing a persistent expansion service. Many wrapped Java transforms manage perform this automatically; if you wish to do this manually, use the 519 package’s function. In order to use Python cross-language transforms, you must manually start any necessary expansion services on your local machine and ensure they are accessible to your code during pipeline constructionSử dụng trình bao bọc SDK To use a cross-language transform through an SDK wrapper, import the package for the SDK wrapper and call it from your pipeline as shown in the example 54Using the CrossLanguage function Khi không có trình bao bọc dành riêng cho SDK, bạn sẽ phải truy cập vào chuyển đổi ngôn ngữ chéo thông qua chức năng 520
13. 2. 4. Sử dụng các biến đổi ngôn ngữ chéo trong một đường ống Bản mô tảViệc sử dụng trình bao bọc Bản mô tả cho đường dẫn ngôn ngữ chéo cũng tương tự như sử dụng bất kỳ biến đổi nào khác, miễn là các phụ thuộc (e. g. một trình thông dịch Python gần đây hoặc Java JRE) có sẵn. Ví dụ: hầu hết các IO Bản mô tả chỉ đơn giản là các trình bao bọc xung quanh các biến đổi Beam từ các ngôn ngữ khác Nếu trình bao bọc chưa có sẵn, người ta có thể sử dụng nó một cách rõ ràng bằng cách sử dụng apache_beam. biến đổi. bên ngoài. rawExternalTransform. lấy `urn` (chuỗi xác định biến đổi), `payload` (đối tượng nhị phân hoặc json tham số hóa biến đổi) và `expansionService` có thể là địa chỉ của dịch vụ bắt đầu trước hoặc trả về có thể gọi được Ví dụ, người ta có thể viết 56Lưu ý rằng 522 phải có bộ mã hóa tương thích đa ngôn ngữ, chẳng hạn như 523. Điều này có thể được đảm bảo với các biến đổi withCoderInternal hoặc withRowCoder, e. g 57Coder cũng có thể được chỉ định trên đầu ra nếu nó không thể được suy ra, e. g Ngoài ra, có một số tiện ích như pythonTransform giúp gọi các phép biến đổi từ các ngôn ngữ cụ thể dễ dàng hơn 58Chuyển đổi ngôn ngữ chéo cũng có thể được xác định theo dòng, có thể hữu ích để truy cập các tính năng hoặc thư viện không có sẵn trong SDK gọi 5913. 3. Hỗ trợ người chạyHiện tại, các trình chạy di động như Flink, Spark và trình chạy trực tiếp có thể được sử dụng với các đường dẫn đa ngôn ngữ Dataflow hỗ trợ các đường ống đa ngôn ngữ thông qua kiến trúc phụ trợ Dataflow Runner v2 13. 4 mẹo và khắc phục sự cốĐể biết thêm mẹo và thông tin khắc phục sự cố, hãy xem tại đây 14 Batched DoFnsThích ứng cho
Batched DoFns hiện là một tính năng chỉ dành cho Python Batched DoFns cho phép người dùng tạo các thành phần mô-đun, có thể kết hợp hoạt động theo lô gồm nhiều thành phần logic. Các DoFn này có thể tận dụng các thư viện Python được vector hóa, như numpy, scipy và pandas, hoạt động trên các lô dữ liệu để đạt hiệu quả 14. 1 Khái niệm cơ bảnBatched DoFns hiện là một tính năng chỉ dành cho Python Một Batched DoFn tầm thường có thể trông như thế này 60DoFn này có thể được sử dụng trong đường ống Beam hoạt động trên các phần tử riêng lẻ. Beam sẽ hoàn toàn đệm các phần tử và tạo các mảng có nhiều mảng ở phía đầu vào và ở phía đầu ra, nó sẽ phân tách các mảng có nhiều mảng thành các phần tử riêng lẻ 61Lưu ý rằng chúng tôi sử dụng để đặt kiểu chữ khôn ngoan cho đầu ra của 525. Sau đó, khi áp dụng 526 cho 73 này, Beam nhận ra rằng 528 là loại lô có thể chấp nhận được để sử dụng cùng với các phần tử 529. Chúng tôi sẽ sử dụng các gợi ý kiểu chữ gọn gàng như thế này trong suốt hướng dẫn này, nhưng Beam cũng hỗ trợ các gợi ý kiểu chữ từ các thư viện khác, hãy xemTrong trường hợp trước, Beam sẽ ngầm tạo và phát nổ các lô ở ranh giới đầu vào và đầu ra. Tuy nhiên, nếu Batched DoFns với các loại tương đương được xâu chuỗi lại với nhau, việc tạo và phát nổ hàng loạt này sẽ bị loại bỏ. Các lô sẽ được chuyển thẳng qua. Điều này làm cho việc tổng hợp hiệu quả các biến đổi hoạt động theo lô trở nên đơn giản hơn nhiều 6214. Dự phòng 2 phần tửBatched DoFns hiện là một tính năng chỉ dành cho Python Đối với một số DoFns, bạn có thể cung cấp cả triển khai theo đợt và theo từng phần tử theo logic mong muốn của mình. Bạn có thể làm điều này bằng cách xác định cả 30 và 531 63Khi thực hiện DoFn này, Beam sẽ chọn cách triển khai tốt nhất để sử dụng trong bối cảnh nhất định. Nói chung, nếu các đầu vào cho DoFn đã được theo đợt thì Beam sẽ sử dụng triển khai theo đợt; Lưu ý rằng, trong trường hợp này, không cần xác định 533. Điều này là do Beam có thể lấy loại đầu ra từ gợi ý đánh máy trên 3014. 3 Sản xuất hàng loạt so với. tiêu thụ hàng loạtBatched DoFns hiện là một tính năng chỉ dành cho Python Theo quy ước, Beam giả định rằng phương pháp 531, sử dụng các đầu vào theo đợt, cũng sẽ tạo ra các đầu ra theo đợt. Tương tự, Beam giả sử phương pháp 30 sẽ tạo ra các phần tử riêng lẻ. Điều này có thể được ghi đè bằng và trang trí. Ví dụ 6414. 4 loại lô được hỗ trợBatched DoFns hiện là một tính năng chỉ dành cho Python Chúng tôi đã sử dụng các loại numpy trong triển khai Batched DoFn trong hướng dẫn này – 539 làm gợi ý kiểu phần tử và 528 làm gợi ý kiểu lô tương ứng – nhưng Beam cũng hỗ trợ các gợi ý kiểu chữ từ các thư viện kháccục mịchGợi ý Kiểu phần tửGợi ý Kiểu hàng loạtCác kiểu số ( 01, 542, 543,…) np. ndarray (của Mảng Numpy)gấu trúcLoại phần tửgợi ýLoại hàng loạtgợi ýCác loại số ( 01, 542, 543,…) 547_______54_______548 057 550Other types?Nếu có các loại lô khác mà bạn muốn sử dụng với Batched DoFns, vui lòng gửi vấn đề 14. 5 loại đầu vào và đầu ra hàng loạt độngBatched DoFns hiện là một tính năng chỉ dành cho Python Đối với một số DoFns theo lô, có thể không đủ để khai báo các loại lô một cách tĩnh, với gợi ý đánh máy trên 30 và/hoặc 531. Bạn có thể cần phải khai báo động các kiểu này. Bạn có thể thực hiện việc này bằng cách ghi đè các phương thức và trên DoFn của mình 6514. 6 lô và ngữ nghĩa thời gian sự kiệnBatched DoFns hiện là một tính năng chỉ dành cho Python Hiện tại, các lô phải có một bộ thông tin thời gian duy nhất (thời gian sự kiện, cửa sổ, v.v.) áp dụng cho mọi thành phần logic trong lô. Hiện tại không có cơ chế để tạo các lô kéo dài nhiều dấu thời gian. Tuy nhiên, có thể truy xuất thông tin thời gian này trong triển khai Batched DoFn. Thông tin này có thể được truy cập bằng cách sử dụng các thuộc tính 555 thông thường |