Snowflake là một trong số ít kho dữ liệu đám mây sẵn sàng cho doanh nghiệp mang lại sự đơn giản mà không phải hy sinh các tính năng. Nó tự động thay đổi quy mô, cả lên và xuống, để đạt được sự cân bằng phù hợp giữa hiệu suất so với. Giá cả. Tuyên bố nổi tiếng của Snowflake là nó tách máy tính khỏi bộ lưu trữ. Điều này rất quan trọng vì hầu hết mọi cơ sở dữ liệu khác, bao gồm cả Redshift, kết hợp cả hai, nghĩa là bạn phải xác định quy mô cho khối lượng công việc lớn nhất của mình và chịu chi phí đi kèm. Trong kịch bản này, chúng ta sẽ tìm hiểu cách tạo cơ sở dữ liệu trong Snowflake và tạo bảng, chèn dữ liệu nhiều hàng vào bảng và cập nhật hàng trong bảng
Truy cập dự án thời gian thực Snowflake để triển khai SCD
Yêu cầu hệ thống
Bước 1. Đăng nhập vào tài khoản
Chúng tôi cần đăng nhập vào tài khoản bông tuyết. Đi đến bông tuyết. com và sau đó đăng nhập bằng cách cung cấp thông tin đăng nhập của bạn. Thực hiện theo các bước ở trên chúng tôi đã cung cấp trong liên kết
Bước 2. Tạo cơ sở dữ liệu trong Snowflake
chúng ta có thể tạo nó theo hai cách. sử dụng câu lệnh CREATE DATABASE
Ghi chú. rằng bạn không cần tạo lược đồ trong cơ sở dữ liệu vì mỗi cơ sở dữ liệu được tạo trong Snowflake chứa một lược đồ công khai mặc định
Cú pháp của tuyên bố
create or replace database [database-name] ;
Ví dụ về tuyên bố
________số 8
Đầu ra của tuyên bố trên. như bạn có thể thấy, câu lệnh trên được chạy thành công trong hình bên dưới
Bước 3. Chọn cơ sở dữ liệu
Để chọn cơ sở dữ liệu mà bạn đã tạo trước đó, chúng ta sẽ sử dụng câu lệnh "use"
Cú pháp của tuyên bố
Use database [database-name];
Ví dụ về tuyên bố
#!/usr/bin/env python3
import psycopg2
import os
import time
def main[]:
conn = psycopg2.connect[os.environ.get['DB_URI']]
lastid = None
while True:
with conn:
with conn.cursor[] as cur:
cur.execute["SET TRANSACTION AS OF SYSTEM TIME '-5s'"]
if lastid:
cur.execute["SELECT id FROM rides WHERE id > %s AND discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000", [lastid,]]
else:
cur.execute["SELECT id FROM rides WHERE discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000"]
pkvals = list[cur]
if not pkvals:
return
while pkvals:
batch = pkvals[:2000]
pkvals = pkvals[2000:]
with conn:
with conn.cursor[] as cur:
cur.execute["UPDATE rides SET discounted = true, revenue = revenue*.9 WHERE id = ANY %s RETURNING id", [batch,]]
print[cur.statusmessage]
if not pkvals:
lastid = cur.fetchone[][0]
del batch
del pkvals
time.sleep[5]
conn.close[]
if __name__ == '__main__':
main[]
0Bước 4. Tạo bảng trong Snowflake bằng cách sử dụng Tạo câu lệnh
Ở đây chúng ta sẽ tạo một bảng bằng cách sử dụng câu lệnh tạo như hình bên dưới. Nó tạo một bảng mới trong lược đồ hiện tại/được chỉ định hoặc thay thế một bảng hiện có
Cú pháp của tuyên bố
#!/usr/bin/env python3
import psycopg2
import os
import time
def main[]:
conn = psycopg2.connect[os.environ.get['DB_URI']]
lastid = None
while True:
with conn:
with conn.cursor[] as cur:
cur.execute["SET TRANSACTION AS OF SYSTEM TIME '-5s'"]
if lastid:
cur.execute["SELECT id FROM rides WHERE id > %s AND discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000", [lastid,]]
else:
cur.execute["SELECT id FROM rides WHERE discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000"]
pkvals = list[cur]
if not pkvals:
return
while pkvals:
batch = pkvals[:2000]
pkvals = pkvals[2000:]
with conn:
with conn.cursor[] as cur:
cur.execute["UPDATE rides SET discounted = true, revenue = revenue*.9 WHERE id = ANY %s RETURNING id", [batch,]]
print[cur.statusmessage]
if not pkvals:
lastid = cur.fetchone[][0]
del batch
del pkvals
time.sleep[5]
conn.close[]
if __name__ == '__main__':
main[]
1Ví dụ về Tạo bảng
#!/usr/bin/env python3
import psycopg2
import os
import time
def main[]:
conn = psycopg2.connect[os.environ.get['DB_URI']]
lastid = None
while True:
with conn:
with conn.cursor[] as cur:
cur.execute["SET TRANSACTION AS OF SYSTEM TIME '-5s'"]
if lastid:
cur.execute["SELECT id FROM rides WHERE id > %s AND discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000", [lastid,]]
else:
cur.execute["SELECT id FROM rides WHERE discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000"]
pkvals = list[cur]
if not pkvals:
return
while pkvals:
batch = pkvals[:2000]
pkvals = pkvals[2000:]
with conn:
with conn.cursor[] as cur:
cur.execute["UPDATE rides SET discounted = true, revenue = revenue*.9 WHERE id = ANY %s RETURNING id", [batch,]]
print[cur.statusmessage]
if not pkvals:
lastid = cur.fetchone[][0]
del batch
del pkvals
time.sleep[5]
conn.close[]
if __name__ == '__main__':
main[]
2Đầu ra của tuyên bố trên
Bước 5. Chèn dữ liệu một hàng vào bảng trong Snowflake bằng câu lệnh INSERT
Ở đây chúng ta sẽ chèn các hàng vào bảng bằng cách sử dụng câu lệnh chèn trong bảng khách hàng bông tuyết, câu lệnh Chèn là lệnh DDL [ngôn ngữ định nghĩa dữ liệu]. Điều đó có nghĩa là chúng tôi đang cập nhật bảng bằng cách chèn một hoặc nhiều hàng vào bảng
Cú pháp của lệnh
#!/usr/bin/env python3
import psycopg2
import os
import time
def main[]:
conn = psycopg2.connect[os.environ.get['DB_URI']]
lastid = None
while True:
with conn:
with conn.cursor[] as cur:
cur.execute["SET TRANSACTION AS OF SYSTEM TIME '-5s'"]
if lastid:
cur.execute["SELECT id FROM rides WHERE id > %s AND discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000", [lastid,]]
else:
cur.execute["SELECT id FROM rides WHERE discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000"]
pkvals = list[cur]
if not pkvals:
return
while pkvals:
batch = pkvals[:2000]
pkvals = pkvals[2000:]
with conn:
with conn.cursor[] as cur:
cur.execute["UPDATE rides SET discounted = true, revenue = revenue*.9 WHERE id = ANY %s RETURNING id", [batch,]]
print[cur.statusmessage]
if not pkvals:
lastid = cur.fetchone[][0]
del batch
del pkvals
time.sleep[5]
conn.close[]
if __name__ == '__main__':
main[]
3Ví dụ về lệnh INSERT
#!/usr/bin/env python3
import psycopg2
import os
import time
def main[]:
conn = psycopg2.connect[os.environ.get['DB_URI']]
lastid = None
while True:
with conn:
with conn.cursor[] as cur:
cur.execute["SET TRANSACTION AS OF SYSTEM TIME '-5s'"]
if lastid:
cur.execute["SELECT id FROM rides WHERE id > %s AND discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000", [lastid,]]
else:
cur.execute["SELECT id FROM rides WHERE discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000"]
pkvals = list[cur]
if not pkvals:
return
while pkvals:
batch = pkvals[:2000]
pkvals = pkvals[2000:]
with conn:
with conn.cursor[] as cur:
cur.execute["UPDATE rides SET discounted = true, revenue = revenue*.9 WHERE id = ANY %s RETURNING id", [batch,]]
print[cur.statusmessage]
if not pkvals:
lastid = cur.fetchone[][0]
del batch
del pkvals
time.sleep[5]
conn.close[]
if __name__ == '__main__':
main[]
4Như bạn thấy, lệnh trên đang chèn một hàng vào bảng khách hàng. Đầu ra của truy vấn trên
Để cập nhật nhiều hàng trong một bảng, bạn có thể sử dụng một câu lệnh
#!/usr/bin/env python3
import psycopg2
import os
import time
def main[]:
conn = psycopg2.connect[os.environ.get['DB_URI']]
lastid = None
while True:
with conn:
with conn.cursor[] as cur:
cur.execute["SET TRANSACTION AS OF SYSTEM TIME '-5s'"]
if lastid:
cur.execute["SELECT id FROM rides WHERE id > %s AND discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000", [lastid,]]
else:
cur.execute["SELECT id FROM rides WHERE discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000"]
pkvals = list[cur]
if not pkvals:
return
while pkvals:
batch = pkvals[:2000]
pkvals = pkvals[2000:]
with conn:
with conn.cursor[] as cur:
cur.execute["UPDATE rides SET discounted = true, revenue = revenue*.9 WHERE id = ANY %s RETURNING id", [batch,]]
print[cur.statusmessage]
if not pkvals:
lastid = cur.fetchone[][0]
del batch
del pkvals
time.sleep[5]
conn.close[]
if __name__ == '__main__':
main[]
5, với mệnh đề #!/usr/bin/env python3
import psycopg2
import os
import time
def main[]:
conn = psycopg2.connect[os.environ.get['DB_URI']]
lastid = None
while True:
with conn:
with conn.cursor[] as cur:
cur.execute["SET TRANSACTION AS OF SYSTEM TIME '-5s'"]
if lastid:
cur.execute["SELECT id FROM rides WHERE id > %s AND discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000", [lastid,]]
else:
cur.execute["SELECT id FROM rides WHERE discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000"]
pkvals = list[cur]
if not pkvals:
return
while pkvals:
batch = pkvals[:2000]
pkvals = pkvals[2000:]
with conn:
with conn.cursor[] as cur:
cur.execute["UPDATE rides SET discounted = true, revenue = revenue*.9 WHERE id = ANY %s RETURNING id", [batch,]]
print[cur.statusmessage]
if not pkvals:
lastid = cur.fetchone[][0]
del batch
del pkvals
time.sleep[5]
conn.close[]
if __name__ == '__main__':
main[]
6 lọc các hàng bạn muốn cập nhậtĐể cập nhật một số lượng lớn hàng [i. e. , hàng chục nghìn hàng trở lên], chúng tôi khuyên bạn nên cập nhật lặp lại các tập hợp con của các hàng mà bạn muốn cập nhật, cho đến khi tất cả các hàng đã được cập nhật. Bạn có thể viết một tập lệnh để thực hiện việc này hoặc bạn có thể viết một vòng lặp vào ứng dụng của mình
Trang này cung cấp hướng dẫn về cách viết các vòng lặp cập nhật hàng loạt với mẫu thực thi các câu lệnh
#!/usr/bin/env python3
import psycopg2
import os
import time
def main[]:
conn = psycopg2.connect[os.environ.get['DB_URI']]
lastid = None
while True:
with conn:
with conn.cursor[] as cur:
cur.execute["SET TRANSACTION AS OF SYSTEM TIME '-5s'"]
if lastid:
cur.execute["SELECT id FROM rides WHERE id > %s AND discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000", [lastid,]]
else:
cur.execute["SELECT id FROM rides WHERE discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000"]
pkvals = list[cur]
if not pkvals:
return
while pkvals:
batch = pkvals[:2000]
pkvals = pkvals[2000:]
with conn:
with conn.cursor[] as cur:
cur.execute["UPDATE rides SET discounted = true, revenue = revenue*.9 WHERE id = ANY %s RETURNING id", [batch,]]
print[cur.statusmessage]
if not pkvals:
lastid = cur.fetchone[][0]
del batch
del pkvals
time.sleep[5]
conn.close[]
if __name__ == '__main__':
main[]
7 và #!/usr/bin/env python3
import psycopg2
import os
import time
def main[]:
conn = psycopg2.connect[os.environ.get['DB_URI']]
lastid = None
while True:
with conn:
with conn.cursor[] as cur:
cur.execute["SET TRANSACTION AS OF SYSTEM TIME '-5s'"]
if lastid:
cur.execute["SELECT id FROM rides WHERE id > %s AND discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000", [lastid,]]
else:
cur.execute["SELECT id FROM rides WHERE discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000"]
pkvals = list[cur]
if not pkvals:
return
while pkvals:
batch = pkvals[:2000]
pkvals = pkvals[2000:]
with conn:
with conn.cursor[] as cur:
cur.execute["UPDATE rides SET discounted = true, revenue = revenue*.9 WHERE id = ANY %s RETURNING id", [batch,]]
print[cur.statusmessage]
if not pkvals:
lastid = cur.fetchone[][0]
del batch
del pkvals
time.sleep[5]
conn.close[]
if __name__ == '__main__':
main[]
5 ở các cấp độ khác nhau của vòng lặp lồng nhauCảnh báo
Thận trọng khi cập nhật hàng loạt các hàng từ các bảng có ràng buộc khóa ngoại và các hành động khóa ngoại rõ ràng
#!/usr/bin/env python3
import psycopg2
import os
import time
def main[]:
conn = psycopg2.connect[os.environ.get['DB_URI']]
lastid = None
while True:
with conn:
with conn.cursor[] as cur:
cur.execute["SET TRANSACTION AS OF SYSTEM TIME '-5s'"]
if lastid:
cur.execute["SELECT id FROM rides WHERE id > %s AND discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000", [lastid,]]
else:
cur.execute["SELECT id FROM rides WHERE discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000"]
pkvals = list[cur]
if not pkvals:
return
while pkvals:
batch = pkvals[:2000]
pkvals = pkvals[2000:]
with conn:
with conn.cursor[] as cur:
cur.execute["UPDATE rides SET discounted = true, revenue = revenue*.9 WHERE id = ANY %s RETURNING id", [batch,]]
print[cur.statusmessage]
if not pkvals:
lastid = cur.fetchone[][0]
del batch
del pkvals
time.sleep[5]
conn.close[]
if __name__ == '__main__':
main[]
9. Để duy trì hiệu suất của #!/usr/bin/env python3
import psycopg2
import os
import time
def main[]:
conn = psycopg2.connect[os.environ.get['DB_URI']]
lastid = None
while True:
with conn:
with conn.cursor[] as cur:
cur.execute["SET TRANSACTION AS OF SYSTEM TIME '-5s'"]
if lastid:
cur.execute["SELECT id FROM rides WHERE id > %s AND discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000", [lastid,]]
else:
cur.execute["SELECT id FROM rides WHERE discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000"]
pkvals = list[cur]
if not pkvals:
return
while pkvals:
batch = pkvals[:2000]
pkvals = pkvals[2000:]
with conn:
with conn.cursor[] as cur:
cur.execute["UPDATE rides SET discounted = true, revenue = revenue*.9 WHERE id = ANY %s RETURNING id", [batch,]]
print[cur.statusmessage]
if not pkvals:
lastid = cur.fetchone[][0]
del batch
del pkvals
time.sleep[5]
conn.close[]
if __name__ == '__main__':
main[]
5 trên các bảng có tác vụ khóa ngoại, chúng tôi khuyên bạn nên sử dụng kích thước lô nhỏ hơn vì các hàng bổ sung được cập nhật do tác vụ #!/usr/bin/env python3
import psycopg2
import os
import time
def main[]:
conn = psycopg2.connect[os.environ.get['DB_URI']]
lastid = None
while True:
with conn:
with conn.cursor[] as cur:
cur.execute["SET TRANSACTION AS OF SYSTEM TIME '-5s'"]
if lastid:
cur.execute["SELECT id FROM rides WHERE id > %s AND discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000", [lastid,]]
else:
cur.execute["SELECT id FROM rides WHERE discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000"]
pkvals = list[cur]
if not pkvals:
return
while pkvals:
batch = pkvals[:2000]
pkvals = pkvals[2000:]
with conn:
with conn.cursor[] as cur:
cur.execute["UPDATE rides SET discounted = true, revenue = revenue*.9 WHERE id = ANY %s RETURNING id", [batch,]]
print[cur.statusmessage]
if not pkvals:
lastid = cur.fetchone[][0]
del batch
del pkvals
time.sleep[5]
conn.close[]
if __name__ == '__main__':
main[]
9 có thể làm cho vòng lặp lô chậm hơn đáng kểTrước khi bắt đầu
Trước khi đọc trang này, hãy làm như sau
- Tạo một cụm máy chủ CockroachDB hoặc bắt đầu một cụm cục bộ
Cài đặt Trình điều khiển hoặc Khung ORM
Đối với ví dụ trên trang này, chúng tôi sử dụng trình điều khiển Python
2#!/usr/bin/env python3 import psycopg2 import os import time def main[]: conn = psycopg2.connect[os.environ.get['DB_URI']] lastid = None while True: with conn: with conn.cursor[] as cur: cur.execute["SET TRANSACTION AS OF SYSTEM TIME '-5s'"] if lastid: cur.execute["SELECT id FROM rides WHERE id > %s AND discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000", [lastid,]] else: cur.execute["SELECT id FROM rides WHERE discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000"] pkvals = list[cur] if not pkvals: return while pkvals: batch = pkvals[:2000] pkvals = pkvals[2000:] with conn: with conn.cursor[] as cur: cur.execute["UPDATE rides SET discounted = true, revenue = revenue*.9 WHERE id = ANY %s RETURNING id", [batch,]] print[cur.statusmessage] if not pkvals: lastid = cur.fetchone[][0] del batch del pkvals time.sleep[5] conn.close[] if __name__ == '__main__': main[]
Kết nối với cơ sở dữ liệu
Chèn dữ liệu mà bây giờ bạn muốn cập nhật
Đối với ví dụ trên trang này, chúng tôi tải một cụm có cơ sở dữ liệu
0 và dữ liệu từ#!/usr/bin/env python3 import psycopg2 import os import time def main[]: conn = psycopg2.connect[os.environ.get['DB_URI']] lastid = None while True: with conn: with conn.cursor[] as cur: cur.execute["SET TRANSACTION AS OF SYSTEM TIME '-5s'"] if lastid: cur.execute["SELECT id FROM rides WHERE id > %s AND discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000", [lastid,]] else: cur.execute["SELECT id FROM rides WHERE discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000"] pkvals = list[cur] if not pkvals: return while pkvals: batch = pkvals[:2000] pkvals = pkvals[2000:] with conn: with conn.cursor[] as cur: cur.execute["UPDATE rides SET discounted = true, revenue = revenue*.9 WHERE id = ANY %s RETURNING id", [batch,]] print[cur.statusmessage] if not pkvals: lastid = cur.fetchone[][0] del batch del pkvals time.sleep[5] conn.close[] if __name__ == '__main__': main[]
1#!/usr/bin/env python3 import psycopg2 import os import time def main[]: conn = psycopg2.connect[os.environ.get['DB_URI']] lastid = None while True: with conn: with conn.cursor[] as cur: cur.execute["SET TRANSACTION AS OF SYSTEM TIME '-5s'"] if lastid: cur.execute["SELECT id FROM rides WHERE id > %s AND discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000", [lastid,]] else: cur.execute["SELECT id FROM rides WHERE discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000"] pkvals = list[cur] if not pkvals: return while pkvals: batch = pkvals[:2000] pkvals = pkvals[2000:] with conn: with conn.cursor[] as cur: cur.execute["UPDATE rides SET discounted = true, revenue = revenue*.9 WHERE id = ANY %s RETURNING id", [batch,]] print[cur.statusmessage] if not pkvals: lastid = cur.fetchone[][0] del batch del pkvals time.sleep[5] conn.close[] if __name__ == '__main__': main[]
Viết một vòng lặp cập nhật hàng loạt
Ở cấp cao nhất của vòng lặp trong ứng dụng của bạn hoặc trong tập lệnh, hãy thực thi truy vấn
7 trả về một loạt lớn các giá trị khóa chính cho các hàng mà bạn muốn cập nhật. Khi xác định truy vấn#!/usr/bin/env python3 import psycopg2 import os import time def main[]: conn = psycopg2.connect[os.environ.get['DB_URI']] lastid = None while True: with conn: with conn.cursor[] as cur: cur.execute["SET TRANSACTION AS OF SYSTEM TIME '-5s'"] if lastid: cur.execute["SELECT id FROM rides WHERE id > %s AND discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000", [lastid,]] else: cur.execute["SELECT id FROM rides WHERE discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000"] pkvals = list[cur] if not pkvals: return while pkvals: batch = pkvals[:2000] pkvals = pkvals[2000:] with conn: with conn.cursor[] as cur: cur.execute["UPDATE rides SET discounted = true, revenue = revenue*.9 WHERE id = ANY %s RETURNING id", [batch,]] print[cur.statusmessage] if not pkvals: lastid = cur.fetchone[][0] del batch del pkvals time.sleep[5] conn.close[] if __name__ == '__main__': main[]
7#!/usr/bin/env python3 import psycopg2 import os import time def main[]: conn = psycopg2.connect[os.environ.get['DB_URI']] lastid = None while True: with conn: with conn.cursor[] as cur: cur.execute["SET TRANSACTION AS OF SYSTEM TIME '-5s'"] if lastid: cur.execute["SELECT id FROM rides WHERE id > %s AND discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000", [lastid,]] else: cur.execute["SELECT id FROM rides WHERE discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000"] pkvals = list[cur] if not pkvals: return while pkvals: batch = pkvals[:2000] pkvals = pkvals[2000:] with conn: with conn.cursor[] as cur: cur.execute["UPDATE rides SET discounted = true, revenue = revenue*.9 WHERE id = ANY %s RETURNING id", [batch,]] print[cur.statusmessage] if not pkvals: lastid = cur.fetchone[][0] del batch del pkvals time.sleep[5] conn.close[] if __name__ == '__main__': main[]
- Sử dụng mệnh đề
6 để lọc trên các cột xác định các hàng mà bạn muốn cập nhật. Mệnh đề này cũng sẽ lọc ra các hàng đã được cập nhật bởi các lần lặp trước của vòng lặp#!/usr/bin/env python3 import psycopg2 import os import time def main[]: conn = psycopg2.connect[os.environ.get['DB_URI']] lastid = None while True: with conn: with conn.cursor[] as cur: cur.execute["SET TRANSACTION AS OF SYSTEM TIME '-5s'"] if lastid: cur.execute["SELECT id FROM rides WHERE id > %s AND discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000", [lastid,]] else: cur.execute["SELECT id FROM rides WHERE discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000"] pkvals = list[cur] if not pkvals: return while pkvals: batch = pkvals[:2000] pkvals = pkvals[2000:] with conn: with conn.cursor[] as cur: cur.execute["UPDATE rides SET discounted = true, revenue = revenue*.9 WHERE id = ANY %s RETURNING id", [batch,]] print[cur.statusmessage] if not pkvals: lastid = cur.fetchone[][0] del batch del pkvals time.sleep[5] conn.close[] if __name__ == '__main__': main[]
5 lồng nhau#!/usr/bin/env python3 import psycopg2 import os import time def main[]: conn = psycopg2.connect[os.environ.get['DB_URI']] lastid = None while True: with conn: with conn.cursor[] as cur: cur.execute["SET TRANSACTION AS OF SYSTEM TIME '-5s'"] if lastid: cur.execute["SELECT id FROM rides WHERE id > %s AND discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000", [lastid,]] else: cur.execute["SELECT id FROM rides WHERE discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000"] pkvals = list[cur] if not pkvals: return while pkvals: batch = pkvals[:2000] pkvals = pkvals[2000:] with conn: with conn.cursor[] as cur: cur.execute["UPDATE rides SET discounted = true, revenue = revenue*.9 WHERE id = ANY %s RETURNING id", [batch,]] print[cur.statusmessage] if not pkvals: lastid = cur.fetchone[][0] del batch del pkvals time.sleep[5] conn.close[] if __name__ == '__main__': main[]
- Để có hiệu suất tối ưu, điều kiện đầu tiên của bộ lọc phải đánh giá giá trị khóa chính cuối cùng được trả về bởi truy vấn
5 cuối cùng đã được thực thi. Điều này thu hẹp quá trình quét của mỗi truy vấn#!/usr/bin/env python3 import psycopg2 import os import time def main[]: conn = psycopg2.connect[os.environ.get['DB_URI']] lastid = None while True: with conn: with conn.cursor[] as cur: cur.execute["SET TRANSACTION AS OF SYSTEM TIME '-5s'"] if lastid: cur.execute["SELECT id FROM rides WHERE id > %s AND discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000", [lastid,]] else: cur.execute["SELECT id FROM rides WHERE discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000"] pkvals = list[cur] if not pkvals: return while pkvals: batch = pkvals[:2000] pkvals = pkvals[2000:] with conn: with conn.cursor[] as cur: cur.execute["UPDATE rides SET discounted = true, revenue = revenue*.9 WHERE id = ANY %s RETURNING id", [batch,]] print[cur.statusmessage] if not pkvals: lastid = cur.fetchone[][0] del batch del pkvals time.sleep[5] conn.close[] if __name__ == '__main__': main[]
7 thành ít hàng nhất có thể và duy trì hiệu suất của các bản cập nhật hàng theo thời gian#!/usr/bin/env python3 import psycopg2 import os import time def main[]: conn = psycopg2.connect[os.environ.get['DB_URI']] lastid = None while True: with conn: with conn.cursor[] as cur: cur.execute["SET TRANSACTION AS OF SYSTEM TIME '-5s'"] if lastid: cur.execute["SELECT id FROM rides WHERE id > %s AND discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000", [lastid,]] else: cur.execute["SELECT id FROM rides WHERE discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000"] pkvals = list[cur] if not pkvals: return while pkvals: batch = pkvals[:2000] pkvals = pkvals[2000:] with conn: with conn.cursor[] as cur: cur.execute["UPDATE rides SET discounted = true, revenue = revenue*.9 WHERE id = ANY %s RETURNING id", [batch,]] print[cur.statusmessage] if not pkvals: lastid = cur.fetchone[][0] del batch del pkvals time.sleep[5] conn.close[] if __name__ == '__main__': main[]
- Một điều kiện khác của bộ lọc sẽ đánh giá các giá trị cột được duy trì trong cơ sở dữ liệu để báo hiệu một hàng đã được cập nhật hay chưa. Điều này ngăn các hàng được cập nhật nhiều lần, trong trường hợp ứng dụng hoặc tập lệnh gặp sự cố và cần được khởi động lại. Nếu không có cách nào để phân biệt giữa một hàng đã cập nhật và một hàng chưa được cập nhật, bạn có thể cần thêm một cột mới vào bảng [e. g. ,
8]#!/usr/bin/env python3 import psycopg2 import os import time def main[]: conn = psycopg2.connect[os.environ.get['DB_URI']] lastid = None while True: with conn: with conn.cursor[] as cur: cur.execute["SET TRANSACTION AS OF SYSTEM TIME '-5s'"] if lastid: cur.execute["SELECT id FROM rides WHERE id > %s AND discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000", [lastid,]] else: cur.execute["SELECT id FROM rides WHERE discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000"] pkvals = list[cur] if not pkvals: return while pkvals: batch = pkvals[:2000] pkvals = pkvals[2000:] with conn: with conn.cursor[] as cur: cur.execute["UPDATE rides SET discounted = true, revenue = revenue*.9 WHERE id = ANY %s RETURNING id", [batch,]] print[cur.statusmessage] if not pkvals: lastid = cur.fetchone[][0] del batch del pkvals time.sleep[5] conn.close[] if __name__ == '__main__': main[]
- Để có hiệu suất tối ưu, điều kiện đầu tiên của bộ lọc phải đánh giá giá trị khóa chính cuối cùng được trả về bởi truy vấn
- Thêm mệnh đề
9 vào cuối truy vấn con lựa chọn hoặc chạy truy vấn lựa chọn trong một giao dịch chỉ đọc, riêng biệt với#!/usr/bin/env python3 import psycopg2 import os import time def main[]: conn = psycopg2.connect[os.environ.get['DB_URI']] lastid = None while True: with conn: with conn.cursor[] as cur: cur.execute["SET TRANSACTION AS OF SYSTEM TIME '-5s'"] if lastid: cur.execute["SELECT id FROM rides WHERE id > %s AND discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000", [lastid,]] else: cur.execute["SELECT id FROM rides WHERE discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000"] pkvals = list[cur] if not pkvals: return while pkvals: batch = pkvals[:2000] pkvals = pkvals[2000:] with conn: with conn.cursor[] as cur: cur.execute["UPDATE rides SET discounted = true, revenue = revenue*.9 WHERE id = ANY %s RETURNING id", [batch,]] print[cur.statusmessage] if not pkvals: lastid = cur.fetchone[][0] del batch del pkvals time.sleep[5] conn.close[] if __name__ == '__main__': main[]
50. Điều này giúp giảm tranh chấp giao dịch#!/usr/bin/env python3 import psycopg2 import os import time def main[]: conn = psycopg2.connect[os.environ.get['DB_URI']] lastid = None while True: with conn: with conn.cursor[] as cur: cur.execute["SET TRANSACTION AS OF SYSTEM TIME '-5s'"] if lastid: cur.execute["SELECT id FROM rides WHERE id > %s AND discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000", [lastid,]] else: cur.execute["SELECT id FROM rides WHERE discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000"] pkvals = list[cur] if not pkvals: return while pkvals: batch = pkvals[:2000] pkvals = pkvals[2000:] with conn: with conn.cursor[] as cur: cur.execute["UPDATE rides SET discounted = true, revenue = revenue*.9 WHERE id = ANY %s RETURNING id", [batch,]] print[cur.statusmessage] if not pkvals: lastid = cur.fetchone[][0] del batch del pkvals time.sleep[5] conn.close[] if __name__ == '__main__': main[]
- Sử dụng mệnh đề
51 để giới hạn số lượng hàng được truy vấn cho một tập hợp con các hàng mà bạn muốn cập nhật. Để xác định kích thước lô#!/usr/bin/env python3 import psycopg2 import os import time def main[]: conn = psycopg2.connect[os.environ.get['DB_URI']] lastid = None while True: with conn: with conn.cursor[] as cur: cur.execute["SET TRANSACTION AS OF SYSTEM TIME '-5s'"] if lastid: cur.execute["SELECT id FROM rides WHERE id > %s AND discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000", [lastid,]] else: cur.execute["SELECT id FROM rides WHERE discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000"] pkvals = list[cur] if not pkvals: return while pkvals: batch = pkvals[:2000] pkvals = pkvals[2000:] with conn: with conn.cursor[] as cur: cur.execute["UPDATE rides SET discounted = true, revenue = revenue*.9 WHERE id = ANY %s RETURNING id", [batch,]] print[cur.statusmessage] if not pkvals: lastid = cur.fetchone[][0] del batch del pkvals time.sleep[5] conn.close[] if __name__ == '__main__': main[]
7 tối ưu, hãy thử các kích thước khác nhau [10.000 hàng, 20.000 hàng, v.v. ] và theo dõi sự thay đổi về hiệu suất. Lưu ý rằng kích thước lô#!/usr/bin/env python3 import psycopg2 import os import time def main[]: conn = psycopg2.connect[os.environ.get['DB_URI']] lastid = None while True: with conn: with conn.cursor[] as cur: cur.execute["SET TRANSACTION AS OF SYSTEM TIME '-5s'"] if lastid: cur.execute["SELECT id FROM rides WHERE id > %s AND discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000", [lastid,]] else: cur.execute["SELECT id FROM rides WHERE discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000"] pkvals = list[cur] if not pkvals: return while pkvals: batch = pkvals[:2000] pkvals = pkvals[2000:] with conn: with conn.cursor[] as cur: cur.execute["UPDATE rides SET discounted = true, revenue = revenue*.9 WHERE id = ANY %s RETURNING id", [batch,]] print[cur.statusmessage] if not pkvals: lastid = cur.fetchone[][0] del batch del pkvals time.sleep[5] conn.close[] if __name__ == '__main__': main[]
7 này có thể lớn hơn nhiều so với kích thước lô của các hàng được cập nhật trong truy vấn#!/usr/bin/env python3 import psycopg2 import os import time def main[]: conn = psycopg2.connect[os.environ.get['DB_URI']] lastid = None while True: with conn: with conn.cursor[] as cur: cur.execute["SET TRANSACTION AS OF SYSTEM TIME '-5s'"] if lastid: cur.execute["SELECT id FROM rides WHERE id > %s AND discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000", [lastid,]] else: cur.execute["SELECT id FROM rides WHERE discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000"] pkvals = list[cur] if not pkvals: return while pkvals: batch = pkvals[:2000] pkvals = pkvals[2000:] with conn: with conn.cursor[] as cur: cur.execute["UPDATE rides SET discounted = true, revenue = revenue*.9 WHERE id = ANY %s RETURNING id", [batch,]] print[cur.statusmessage] if not pkvals: lastid = cur.fetchone[][0] del batch del pkvals time.sleep[5] conn.close[] if __name__ == '__main__': main[]
5 tiếp theo#!/usr/bin/env python3 import psycopg2 import os import time def main[]: conn = psycopg2.connect[os.environ.get['DB_URI']] lastid = None while True: with conn: with conn.cursor[] as cur: cur.execute["SET TRANSACTION AS OF SYSTEM TIME '-5s'"] if lastid: cur.execute["SELECT id FROM rides WHERE id > %s AND discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000", [lastid,]] else: cur.execute["SELECT id FROM rides WHERE discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000"] pkvals = list[cur] if not pkvals: return while pkvals: batch = pkvals[:2000] pkvals = pkvals[2000:] with conn: with conn.cursor[] as cur: cur.execute["UPDATE rides SET discounted = true, revenue = revenue*.9 WHERE id = ANY %s RETURNING id", [batch,]] print[cur.statusmessage] if not pkvals: lastid = cur.fetchone[][0] del batch del pkvals time.sleep[5] conn.close[] if __name__ == '__main__': main[]
- Để đảm bảo rằng các hàng được quét hiệu quả trong truy vấn
5 tiếp theo, hãy bao gồm mệnh đề#!/usr/bin/env python3 import psycopg2 import os import time def main[]: conn = psycopg2.connect[os.environ.get['DB_URI']] lastid = None while True: with conn: with conn.cursor[] as cur: cur.execute["SET TRANSACTION AS OF SYSTEM TIME '-5s'"] if lastid: cur.execute["SELECT id FROM rides WHERE id > %s AND discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000", [lastid,]] else: cur.execute["SELECT id FROM rides WHERE discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000"] pkvals = list[cur] if not pkvals: return while pkvals: batch = pkvals[:2000] pkvals = pkvals[2000:] with conn: with conn.cursor[] as cur: cur.execute["UPDATE rides SET discounted = true, revenue = revenue*.9 WHERE id = ANY %s RETURNING id", [batch,]] print[cur.statusmessage] if not pkvals: lastid = cur.fetchone[][0] del batch del pkvals time.sleep[5] conn.close[] if __name__ == '__main__': main[]
56 trên khóa chính#!/usr/bin/env python3 import psycopg2 import os import time def main[]: conn = psycopg2.connect[os.environ.get['DB_URI']] lastid = None while True: with conn: with conn.cursor[] as cur: cur.execute["SET TRANSACTION AS OF SYSTEM TIME '-5s'"] if lastid: cur.execute["SELECT id FROM rides WHERE id > %s AND discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000", [lastid,]] else: cur.execute["SELECT id FROM rides WHERE discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000"] pkvals = list[cur] if not pkvals: return while pkvals: batch = pkvals[:2000] pkvals = pkvals[2000:] with conn: with conn.cursor[] as cur: cur.execute["UPDATE rides SET discounted = true, revenue = revenue*.9 WHERE id = ANY %s RETURNING id", [batch,]] print[cur.statusmessage] if not pkvals: lastid = cur.fetchone[][0] del batch del pkvals time.sleep[5] conn.close[] if __name__ == '__main__': main[]
- Sử dụng mệnh đề
Trong truy vấn
7, hãy viết một vòng lặp lồng nhau thực thi truy vấn#!/usr/bin/env python3 import psycopg2 import os import time def main[]: conn = psycopg2.connect[os.environ.get['DB_URI']] lastid = None while True: with conn: with conn.cursor[] as cur: cur.execute["SET TRANSACTION AS OF SYSTEM TIME '-5s'"] if lastid: cur.execute["SELECT id FROM rides WHERE id > %s AND discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000", [lastid,]] else: cur.execute["SELECT id FROM rides WHERE discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000"] pkvals = list[cur] if not pkvals: return while pkvals: batch = pkvals[:2000] pkvals = pkvals[2000:] with conn: with conn.cursor[] as cur: cur.execute["UPDATE rides SET discounted = true, revenue = revenue*.9 WHERE id = ANY %s RETURNING id", [batch,]] print[cur.statusmessage] if not pkvals: lastid = cur.fetchone[][0] del batch del pkvals time.sleep[5] conn.close[] if __name__ == '__main__': main[]
5 trên các giá trị khóa chính được truy vấn#!/usr/bin/env python3 import psycopg2 import os import time def main[]: conn = psycopg2.connect[os.environ.get['DB_URI']] lastid = None while True: with conn: with conn.cursor[] as cur: cur.execute["SET TRANSACTION AS OF SYSTEM TIME '-5s'"] if lastid: cur.execute["SELECT id FROM rides WHERE id > %s AND discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000", [lastid,]] else: cur.execute["SELECT id FROM rides WHERE discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000"] pkvals = list[cur] if not pkvals: return while pkvals: batch = pkvals[:2000] pkvals = pkvals[2000:] with conn: with conn.cursor[] as cur: cur.execute["UPDATE rides SET discounted = true, revenue = revenue*.9 WHERE id = ANY %s RETURNING id", [batch,]] print[cur.statusmessage] if not pkvals: lastid = cur.fetchone[][0] del batch del pkvals time.sleep[5] conn.close[] if __name__ == '__main__': main[]
7 trả về, theo lô nhỏ hơn kích thước lô#!/usr/bin/env python3 import psycopg2 import os import time def main[]: conn = psycopg2.connect[os.environ.get['DB_URI']] lastid = None while True: with conn: with conn.cursor[] as cur: cur.execute["SET TRANSACTION AS OF SYSTEM TIME '-5s'"] if lastid: cur.execute["SELECT id FROM rides WHERE id > %s AND discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000", [lastid,]] else: cur.execute["SELECT id FROM rides WHERE discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000"] pkvals = list[cur] if not pkvals: return while pkvals: batch = pkvals[:2000] pkvals = pkvals[2000:] with conn: with conn.cursor[] as cur: cur.execute["UPDATE rides SET discounted = true, revenue = revenue*.9 WHERE id = ANY %s RETURNING id", [batch,]] print[cur.statusmessage] if not pkvals: lastid = cur.fetchone[][0] del batch del pkvals time.sleep[5] conn.close[] if __name__ == '__main__': main[]
7 ban đầu. Khi xác định truy vấn#!/usr/bin/env python3 import psycopg2 import os import time def main[]: conn = psycopg2.connect[os.environ.get['DB_URI']] lastid = None while True: with conn: with conn.cursor[] as cur: cur.execute["SET TRANSACTION AS OF SYSTEM TIME '-5s'"] if lastid: cur.execute["SELECT id FROM rides WHERE id > %s AND discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000", [lastid,]] else: cur.execute["SELECT id FROM rides WHERE discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000"] pkvals = list[cur] if not pkvals: return while pkvals: batch = pkvals[:2000] pkvals = pkvals[2000:] with conn: with conn.cursor[] as cur: cur.execute["UPDATE rides SET discounted = true, revenue = revenue*.9 WHERE id = ANY %s RETURNING id", [batch,]] print[cur.statusmessage] if not pkvals: lastid = cur.fetchone[][0] del batch del pkvals time.sleep[5] conn.close[] if __name__ == '__main__': main[]
5#!/usr/bin/env python3 import psycopg2 import os import time def main[]: conn = psycopg2.connect[os.environ.get['DB_URI']] lastid = None while True: with conn: with conn.cursor[] as cur: cur.execute["SET TRANSACTION AS OF SYSTEM TIME '-5s'"] if lastid: cur.execute["SELECT id FROM rides WHERE id > %s AND discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000", [lastid,]] else: cur.execute["SELECT id FROM rides WHERE discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000"] pkvals = list[cur] if not pkvals: return while pkvals: batch = pkvals[:2000] pkvals = pkvals[2000:] with conn: with conn.cursor[] as cur: cur.execute["UPDATE rides SET discounted = true, revenue = revenue*.9 WHERE id = ANY %s RETURNING id", [batch,]] print[cur.statusmessage] if not pkvals: lastid = cur.fetchone[][0] del batch del pkvals time.sleep[5] conn.close[] if __name__ == '__main__': main[]
- Sử dụng mệnh đề
6 để lọc tập hợp con của các giá trị khóa chính được trả về bởi truy vấn#!/usr/bin/env python3 import psycopg2 import os import time def main[]: conn = psycopg2.connect[os.environ.get['DB_URI']] lastid = None while True: with conn: with conn.cursor[] as cur: cur.execute["SET TRANSACTION AS OF SYSTEM TIME '-5s'"] if lastid: cur.execute["SELECT id FROM rides WHERE id > %s AND discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000", [lastid,]] else: cur.execute["SELECT id FROM rides WHERE discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000"] pkvals = list[cur] if not pkvals: return while pkvals: batch = pkvals[:2000] pkvals = pkvals[2000:] with conn: with conn.cursor[] as cur: cur.execute["UPDATE rides SET discounted = true, revenue = revenue*.9 WHERE id = ANY %s RETURNING id", [batch,]] print[cur.statusmessage] if not pkvals: lastid = cur.fetchone[][0] del batch del pkvals time.sleep[5] conn.close[] if __name__ == '__main__': main[]
7 cấp cao nhất. Để xác định kích thước lô#!/usr/bin/env python3 import psycopg2 import os import time def main[]: conn = psycopg2.connect[os.environ.get['DB_URI']] lastid = None while True: with conn: with conn.cursor[] as cur: cur.execute["SET TRANSACTION AS OF SYSTEM TIME '-5s'"] if lastid: cur.execute["SELECT id FROM rides WHERE id > %s AND discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000", [lastid,]] else: cur.execute["SELECT id FROM rides WHERE discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000"] pkvals = list[cur] if not pkvals: return while pkvals: batch = pkvals[:2000] pkvals = pkvals[2000:] with conn: with conn.cursor[] as cur: cur.execute["UPDATE rides SET discounted = true, revenue = revenue*.9 WHERE id = ANY %s RETURNING id", [batch,]] print[cur.statusmessage] if not pkvals: lastid = cur.fetchone[][0] del batch del pkvals time.sleep[5] conn.close[] if __name__ == '__main__': main[]
5 tối ưu, hãy thử các kích thước khác nhau [1.000 hàng, 2.000 hàng, v.v. ] và theo dõi sự thay đổi về hiệu suất#!/usr/bin/env python3 import psycopg2 import os import time def main[]: conn = psycopg2.connect[os.environ.get['DB_URI']] lastid = None while True: with conn: with conn.cursor[] as cur: cur.execute["SET TRANSACTION AS OF SYSTEM TIME '-5s'"] if lastid: cur.execute["SELECT id FROM rides WHERE id > %s AND discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000", [lastid,]] else: cur.execute["SELECT id FROM rides WHERE discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000"] pkvals = list[cur] if not pkvals: return while pkvals: batch = pkvals[:2000] pkvals = pkvals[2000:] with conn: with conn.cursor[] as cur: cur.execute["UPDATE rides SET discounted = true, revenue = revenue*.9 WHERE id = ANY %s RETURNING id", [batch,]] print[cur.statusmessage] if not pkvals: lastid = cur.fetchone[][0] del batch del pkvals time.sleep[5] conn.close[] if __name__ == '__main__': main[]
- Đảm bảo rằng truy vấn
5 cập nhật cột cho biết hàng đã được cập nhật hay chưa. Cột này có thể khác với cột có giá trị bạn muốn cập nhật#!/usr/bin/env python3 import psycopg2 import os import time def main[]: conn = psycopg2.connect[os.environ.get['DB_URI']] lastid = None while True: with conn: with conn.cursor[] as cur: cur.execute["SET TRANSACTION AS OF SYSTEM TIME '-5s'"] if lastid: cur.execute["SELECT id FROM rides WHERE id > %s AND discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000", [lastid,]] else: cur.execute["SELECT id FROM rides WHERE discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000"] pkvals = list[cur] if not pkvals: return while pkvals: batch = pkvals[:2000] pkvals = pkvals[2000:] with conn: with conn.cursor[] as cur: cur.execute["UPDATE rides SET discounted = true, revenue = revenue*.9 WHERE id = ANY %s RETURNING id", [batch,]] print[cur.statusmessage] if not pkvals: lastid = cur.fetchone[][0] del batch del pkvals time.sleep[5] conn.close[] if __name__ == '__main__': main[]
- Thêm mệnh đề
66 vào cuối truy vấn trả về các giá trị khóa chính của các hàng đang được cập nhật. Mệnh đề#!/usr/bin/env python3 import psycopg2 import os import time def main[]: conn = psycopg2.connect[os.environ.get['DB_URI']] lastid = None while True: with conn: with conn.cursor[] as cur: cur.execute["SET TRANSACTION AS OF SYSTEM TIME '-5s'"] if lastid: cur.execute["SELECT id FROM rides WHERE id > %s AND discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000", [lastid,]] else: cur.execute["SELECT id FROM rides WHERE discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000"] pkvals = list[cur] if not pkvals: return while pkvals: batch = pkvals[:2000] pkvals = pkvals[2000:] with conn: with conn.cursor[] as cur: cur.execute["UPDATE rides SET discounted = true, revenue = revenue*.9 WHERE id = ANY %s RETURNING id", [batch,]] print[cur.statusmessage] if not pkvals: lastid = cur.fetchone[][0] del batch del pkvals time.sleep[5] conn.close[] if __name__ == '__main__': main[]
6 trong truy vấn#!/usr/bin/env python3 import psycopg2 import os import time def main[]: conn = psycopg2.connect[os.environ.get['DB_URI']] lastid = None while True: with conn: with conn.cursor[] as cur: cur.execute["SET TRANSACTION AS OF SYSTEM TIME '-5s'"] if lastid: cur.execute["SELECT id FROM rides WHERE id > %s AND discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000", [lastid,]] else: cur.execute["SELECT id FROM rides WHERE discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000"] pkvals = list[cur] if not pkvals: return while pkvals: batch = pkvals[:2000] pkvals = pkvals[2000:] with conn: with conn.cursor[] as cur: cur.execute["UPDATE rides SET discounted = true, revenue = revenue*.9 WHERE id = ANY %s RETURNING id", [batch,]] print[cur.statusmessage] if not pkvals: lastid = cur.fetchone[][0] del batch del pkvals time.sleep[5] conn.close[] if __name__ == '__main__': main[]
7 cấp cao nhất sẽ lọc ra giá trị khóa chính của hàng cuối cùng đã được cập nhật, sử dụng các giá trị được trả về bởi truy vấn#!/usr/bin/env python3 import psycopg2 import os import time def main[]: conn = psycopg2.connect[os.environ.get['DB_URI']] lastid = None while True: with conn: with conn.cursor[] as cur: cur.execute["SET TRANSACTION AS OF SYSTEM TIME '-5s'"] if lastid: cur.execute["SELECT id FROM rides WHERE id > %s AND discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000", [lastid,]] else: cur.execute["SELECT id FROM rides WHERE discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000"] pkvals = list[cur] if not pkvals: return while pkvals: batch = pkvals[:2000] pkvals = pkvals[2000:] with conn: with conn.cursor[] as cur: cur.execute["UPDATE rides SET discounted = true, revenue = revenue*.9 WHERE id = ANY %s RETURNING id", [batch,]] print[cur.statusmessage] if not pkvals: lastid = cur.fetchone[][0] del batch del pkvals time.sleep[5] conn.close[] if __name__ == '__main__': main[]
5 cuối cùng được thực thi#!/usr/bin/env python3 import psycopg2 import os import time def main[]: conn = psycopg2.connect[os.environ.get['DB_URI']] lastid = None while True: with conn: with conn.cursor[] as cur: cur.execute["SET TRANSACTION AS OF SYSTEM TIME '-5s'"] if lastid: cur.execute["SELECT id FROM rides WHERE id > %s AND discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000", [lastid,]] else: cur.execute["SELECT id FROM rides WHERE discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000"] pkvals = list[cur] if not pkvals: return while pkvals: batch = pkvals[:2000] pkvals = pkvals[2000:] with conn: with conn.cursor[] as cur: cur.execute["UPDATE rides SET discounted = true, revenue = revenue*.9 WHERE id = ANY %s RETURNING id", [batch,]] print[cur.statusmessage] if not pkvals: lastid = cur.fetchone[][0] del batch del pkvals time.sleep[5] conn.close[] if __name__ == '__main__': main[]
- Nếu có thể, chúng tôi khuyên bạn nên thực hiện từng
5 trong một giao dịch riêng biệt#!/usr/bin/env python3 import psycopg2 import os import time def main[]: conn = psycopg2.connect[os.environ.get['DB_URI']] lastid = None while True: with conn: with conn.cursor[] as cur: cur.execute["SET TRANSACTION AS OF SYSTEM TIME '-5s'"] if lastid: cur.execute["SELECT id FROM rides WHERE id > %s AND discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000", [lastid,]] else: cur.execute["SELECT id FROM rides WHERE discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000"] pkvals = list[cur] if not pkvals: return while pkvals: batch = pkvals[:2000] pkvals = pkvals[2000:] with conn: with conn.cursor[] as cur: cur.execute["UPDATE rides SET discounted = true, revenue = revenue*.9 WHERE id = ANY %s RETURNING id", [batch,]] print[cur.statusmessage] if not pkvals: lastid = cur.fetchone[][0] del batch del pkvals time.sleep[5] conn.close[] if __name__ == '__main__': main[]
- Sử dụng mệnh đề
Ví dụ
Giả sử rằng trong năm qua, bạn đã ghi lại hàng trăm nghìn lượt đi MovR trong một cụm được tải bằng cơ sở dữ liệu
#!/usr/bin/env python3
import psycopg2
import os
import time
def main[]:
conn = psycopg2.connect[os.environ.get['DB_URI']]
lastid = None
while True:
with conn:
with conn.cursor[] as cur:
cur.execute["SET TRANSACTION AS OF SYSTEM TIME '-5s'"]
if lastid:
cur.execute["SELECT id FROM rides WHERE id > %s AND discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000", [lastid,]]
else:
cur.execute["SELECT id FROM rides WHERE discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000"]
pkvals = list[cur]
if not pkvals:
return
while pkvals:
batch = pkvals[:2000]
pkvals = pkvals[2000:]
with conn:
with conn.cursor[] as cur:
cur.execute["UPDATE rides SET discounted = true, revenue = revenue*.9 WHERE id = ANY %s RETURNING id", [batch,]]
print[cur.statusmessage]
if not pkvals:
lastid = cur.fetchone[][0]
del batch
del pkvals
time.sleep[5]
conn.close[]
if __name__ == '__main__':
main[]
0. Và giả sử rằng, trong tuần cuối cùng của tháng 12, bạn đã áp dụng chiết khấu 10% cho tất cả các khoản phí đi xe được lập hóa đơn cho người dùng, nhưng bạn không cập nhật bảng #!/usr/bin/env python3
import psycopg2
import os
import time
def main[]:
conn = psycopg2.connect[os.environ.get['DB_URI']]
lastid = None
while True:
with conn:
with conn.cursor[] as cur:
cur.execute["SET TRANSACTION AS OF SYSTEM TIME '-5s'"]
if lastid:
cur.execute["SELECT id FROM rides WHERE id > %s AND discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000", [lastid,]]
else:
cur.execute["SELECT id FROM rides WHERE discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000"]
pkvals = list[cur]
if not pkvals:
return
while pkvals:
batch = pkvals[:2000]
pkvals = pkvals[2000:]
with conn:
with conn.cursor[] as cur:
cur.execute["UPDATE rides SET discounted = true, revenue = revenue*.9 WHERE id = ANY %s RETURNING id", [batch,]]
print[cur.statusmessage]
if not pkvals:
lastid = cur.fetchone[][0]
del batch
del pkvals
time.sleep[5]
conn.close[]
if __name__ == '__main__':
main[]
72 để phản ánh các khoản chiết khấuĐể cập nhật bảng
#!/usr/bin/env python3
import psycopg2
import os
import time
def main[]:
conn = psycopg2.connect[os.environ.get['DB_URI']]
lastid = None
while True:
with conn:
with conn.cursor[] as cur:
cur.execute["SET TRANSACTION AS OF SYSTEM TIME '-5s'"]
if lastid:
cur.execute["SELECT id FROM rides WHERE id > %s AND discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000", [lastid,]]
else:
cur.execute["SELECT id FROM rides WHERE discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000"]
pkvals = list[cur]
if not pkvals:
return
while pkvals:
batch = pkvals[:2000]
pkvals = pkvals[2000:]
with conn:
with conn.cursor[] as cur:
cur.execute["UPDATE rides SET discounted = true, revenue = revenue*.9 WHERE id = ANY %s RETURNING id", [batch,]]
print[cur.statusmessage]
if not pkvals:
lastid = cur.fetchone[][0]
del batch
del pkvals
time.sleep[5]
conn.close[]
if __name__ == '__main__':
main[]
72, bạn có thể tạo một vòng lặp cập nhật hàng loạt các hàng có liên quan của bảng #!/usr/bin/env python3
import psycopg2
import os
import time
def main[]:
conn = psycopg2.connect[os.environ.get['DB_URI']]
lastid = None
while True:
with conn:
with conn.cursor[] as cur:
cur.execute["SET TRANSACTION AS OF SYSTEM TIME '-5s'"]
if lastid:
cur.execute["SELECT id FROM rides WHERE id > %s AND discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000", [lastid,]]
else:
cur.execute["SELECT id FROM rides WHERE discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000"]
pkvals = list[cur]
if not pkvals:
return
while pkvals:
batch = pkvals[:2000]
pkvals = pkvals[2000:]
with conn:
with conn.cursor[] as cur:
cur.execute["UPDATE rides SET discounted = true, revenue = revenue*.9 WHERE id = ANY %s RETURNING id", [batch,]]
print[cur.statusmessage]
if not pkvals:
lastid = cur.fetchone[][0]
del batch
del pkvals
time.sleep[5]
conn.close[]
if __name__ == '__main__':
main[]
72 theo hướng dẫn truy vấn được cung cấp ở trênTrong trường hợp này, bạn cũng cần thêm một cột mới vào bảng
#!/usr/bin/env python3
import psycopg2
import os
import time
def main[]:
conn = psycopg2.connect[os.environ.get['DB_URI']]
lastid = None
while True:
with conn:
with conn.cursor[] as cur:
cur.execute["SET TRANSACTION AS OF SYSTEM TIME '-5s'"]
if lastid:
cur.execute["SELECT id FROM rides WHERE id > %s AND discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000", [lastid,]]
else:
cur.execute["SELECT id FROM rides WHERE discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000"]
pkvals = list[cur]
if not pkvals:
return
while pkvals:
batch = pkvals[:2000]
pkvals = pkvals[2000:]
with conn:
with conn.cursor[] as cur:
cur.execute["UPDATE rides SET discounted = true, revenue = revenue*.9 WHERE id = ANY %s RETURNING id", [batch,]]
print[cur.statusmessage]
if not pkvals:
lastid = cur.fetchone[][0]
del batch
del pkvals
time.sleep[5]
conn.close[]
if __name__ == '__main__':
main[]
72 để báo hiệu một hàng đã được cập nhật hay chưa. Sử dụng cột này, truy vấn #!/usr/bin/env python3
import psycopg2
import os
import time
def main[]:
conn = psycopg2.connect[os.environ.get['DB_URI']]
lastid = None
while True:
with conn:
with conn.cursor[] as cur:
cur.execute["SET TRANSACTION AS OF SYSTEM TIME '-5s'"]
if lastid:
cur.execute["SELECT id FROM rides WHERE id > %s AND discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000", [lastid,]]
else:
cur.execute["SELECT id FROM rides WHERE discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000"]
pkvals = list[cur]
if not pkvals:
return
while pkvals:
batch = pkvals[:2000]
pkvals = pkvals[2000:]
with conn:
with conn.cursor[] as cur:
cur.execute["UPDATE rides SET discounted = true, revenue = revenue*.9 WHERE id = ANY %s RETURNING id", [batch,]]
print[cur.statusmessage]
if not pkvals:
lastid = cur.fetchone[][0]
del batch
del pkvals
time.sleep[5]
conn.close[]
if __name__ == '__main__':
main[]
7 cấp cao nhất có thể lọc ra các hàng đã được cập nhật, điều này sẽ ngăn không cho các hàng được cập nhật nhiều lần nếu tập lệnh gặp sự cốVí dụ: bạn có thể tạo một cột có tên là
#!/usr/bin/env python3
import psycopg2
import os
import time
def main[]:
conn = psycopg2.connect[os.environ.get['DB_URI']]
lastid = None
while True:
with conn:
with conn.cursor[] as cur:
cur.execute["SET TRANSACTION AS OF SYSTEM TIME '-5s'"]
if lastid:
cur.execute["SELECT id FROM rides WHERE id > %s AND discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000", [lastid,]]
else:
cur.execute["SELECT id FROM rides WHERE discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000"]
pkvals = list[cur]
if not pkvals:
return
while pkvals:
batch = pkvals[:2000]
pkvals = pkvals[2000:]
with conn:
with conn.cursor[] as cur:
cur.execute["UPDATE rides SET discounted = true, revenue = revenue*.9 WHERE id = ANY %s RETURNING id", [batch,]]
print[cur.statusmessage]
if not pkvals:
lastid = cur.fetchone[][0]
del batch
del pkvals
time.sleep[5]
conn.close[]
if __name__ == '__main__':
main[]
77, thuộc kiểu dữ liệu #!/usr/bin/env python3
import psycopg2
import os
import time
def main[]:
conn = psycopg2.connect[os.environ.get['DB_URI']]
lastid = None
while True:
with conn:
with conn.cursor[] as cur:
cur.execute["SET TRANSACTION AS OF SYSTEM TIME '-5s'"]
if lastid:
cur.execute["SELECT id FROM rides WHERE id > %s AND discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000", [lastid,]]
else:
cur.execute["SELECT id FROM rides WHERE discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000"]
pkvals = list[cur]
if not pkvals:
return
while pkvals:
batch = pkvals[:2000]
pkvals = pkvals[2000:]
with conn:
with conn.cursor[] as cur:
cur.execute["UPDATE rides SET discounted = true, revenue = revenue*.9 WHERE id = ANY %s RETURNING id", [batch,]]
print[cur.statusmessage]
if not pkvals:
lastid = cur.fetchone[][0]
del batch
del pkvals
time.sleep[5]
conn.close[]
if __name__ == '__main__':
main[]
78biểu tượng/nút/bản sao
ALTER TABLE rides ADD COLUMN discounted BOOL DEFAULT false;
Ghi chú
Câu lệnh
#!/usr/bin/env python3
import psycopg2
import os
import time
def main[]:
conn = psycopg2.connect[os.environ.get['DB_URI']]
lastid = None
while True:
with conn:
with conn.cursor[] as cur:
cur.execute["SET TRANSACTION AS OF SYSTEM TIME '-5s'"]
if lastid:
cur.execute["SELECT id FROM rides WHERE id > %s AND discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000", [lastid,]]
else:
cur.execute["SELECT id FROM rides WHERE discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000"]
pkvals = list[cur]
if not pkvals:
return
while pkvals:
batch = pkvals[:2000]
pkvals = pkvals[2000:]
with conn:
with conn.cursor[] as cur:
cur.execute["UPDATE rides SET discounted = true, revenue = revenue*.9 WHERE id = ANY %s RETURNING id", [batch,]]
print[cur.statusmessage]
if not pkvals:
lastid = cur.fetchone[][0]
del batch
del pkvals
time.sleep[5]
conn.close[]
if __name__ == '__main__':
main[]
79 thực hiện thay đổi lược đồ. Để biết thêm thông tin về cách hoạt động của các thay đổi lược đồ trực tuyến trong CockroachDB, hãy xem Thay đổi lược đồ trực tuyếnTrong Python, tập lệnh cập nhật hàng loạt có thể trông giống như sau
biểu tượng/nút/bản sao
#!/usr/bin/env python3
import psycopg2
import os
import time
def main[]:
conn = psycopg2.connect[os.environ.get['DB_URI']]
lastid = None
while True:
with conn:
with conn.cursor[] as cur:
cur.execute["SET TRANSACTION AS OF SYSTEM TIME '-5s'"]
if lastid:
cur.execute["SELECT id FROM rides WHERE id > %s AND discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000", [lastid,]]
else:
cur.execute["SELECT id FROM rides WHERE discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000"]
pkvals = list[cur]
if not pkvals:
return
while pkvals:
batch = pkvals[:2000]
pkvals = pkvals[2000:]
with conn:
with conn.cursor[] as cur:
cur.execute["UPDATE rides SET discounted = true, revenue = revenue*.9 WHERE id = ANY %s RETURNING id", [batch,]]
print[cur.statusmessage]
if not pkvals:
lastid = cur.fetchone[][0]
del batch
del pkvals
time.sleep[5]
conn.close[]
if __name__ == '__main__':
main[]
Ở mỗi lần lặp lại, truy vấn
#!/usr/bin/env python3
import psycopg2
import os
import time
def main[]:
conn = psycopg2.connect[os.environ.get['DB_URI']]
lastid = None
while True:
with conn:
with conn.cursor[] as cur:
cur.execute["SET TRANSACTION AS OF SYSTEM TIME '-5s'"]
if lastid:
cur.execute["SELECT id FROM rides WHERE id > %s AND discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000", [lastid,]]
else:
cur.execute["SELECT id FROM rides WHERE discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000"]
pkvals = list[cur]
if not pkvals:
return
while pkvals:
batch = pkvals[:2000]
pkvals = pkvals[2000:]
with conn:
with conn.cursor[] as cur:
cur.execute["UPDATE rides SET discounted = true, revenue = revenue*.9 WHERE id = ANY %s RETURNING id", [batch,]]
print[cur.statusmessage]
if not pkvals:
lastid = cur.fetchone[][0]
del batch
del pkvals
time.sleep[5]
conn.close[]
if __name__ == '__main__':
main[]
7 trả về các giá trị khóa chính của tối đa 10.000 hàng dữ liệu lịch sử phù hợp từ 5 giây trước, trong một giao dịch chỉ đọc. Sau đó, một vòng lặp lồng nhau sẽ lặp lại các giá trị khóa chính được trả về trong các lô nhỏ hơn gồm 2.000 hàng. Tại mỗi lần lặp của vòng lặp #!/usr/bin/env python3
import psycopg2
import os
import time
def main[]:
conn = psycopg2.connect[os.environ.get['DB_URI']]
lastid = None
while True:
with conn:
with conn.cursor[] as cur:
cur.execute["SET TRANSACTION AS OF SYSTEM TIME '-5s'"]
if lastid:
cur.execute["SELECT id FROM rides WHERE id > %s AND discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000", [lastid,]]
else:
cur.execute["SELECT id FROM rides WHERE discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000"]
pkvals = list[cur]
if not pkvals:
return
while pkvals:
batch = pkvals[:2000]
pkvals = pkvals[2000:]
with conn:
with conn.cursor[] as cur:
cur.execute["UPDATE rides SET discounted = true, revenue = revenue*.9 WHERE id = ANY %s RETURNING id", [batch,]]
print[cur.statusmessage]
if not pkvals:
lastid = cur.fetchone[][0]
del batch
del pkvals
time.sleep[5]
conn.close[]
if __name__ == '__main__':
main[]
5 lồng nhau, một lô hàng được cập nhật. Sau khi vòng lặp #!/usr/bin/env python3
import psycopg2
import os
import time
def main[]:
conn = psycopg2.connect[os.environ.get['DB_URI']]
lastid = None
while True:
with conn:
with conn.cursor[] as cur:
cur.execute["SET TRANSACTION AS OF SYSTEM TIME '-5s'"]
if lastid:
cur.execute["SELECT id FROM rides WHERE id > %s AND discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000", [lastid,]]
else:
cur.execute["SELECT id FROM rides WHERE discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000"]
pkvals = list[cur]
if not pkvals:
return
while pkvals:
batch = pkvals[:2000]
pkvals = pkvals[2000:]
with conn:
with conn.cursor[] as cur:
cur.execute["UPDATE rides SET discounted = true, revenue = revenue*.9 WHERE id = ANY %s RETURNING id", [batch,]]
print[cur.statusmessage]
if not pkvals:
lastid = cur.fetchone[][0]
del batch
del pkvals
time.sleep[5]
conn.close[]
if __name__ == '__main__':
main[]
5 lồng nhau cập nhật tất cả các hàng từ truy vấn lựa chọn ban đầu, một khoảng thời gian trễ đảm bảo rằng truy vấn lựa chọn tiếp theo đọc dữ liệu lịch sử từ bảng sau lần cập nhật cuối cùng của lần lặp cuối cùng #!/usr/bin/env python3
import psycopg2
import os
import time
def main[]:
conn = psycopg2.connect[os.environ.get['DB_URI']]
lastid = None
while True:
with conn:
with conn.cursor[] as cur:
cur.execute["SET TRANSACTION AS OF SYSTEM TIME '-5s'"]
if lastid:
cur.execute["SELECT id FROM rides WHERE id > %s AND discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000", [lastid,]]
else:
cur.execute["SELECT id FROM rides WHERE discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000"]
pkvals = list[cur]
if not pkvals:
return
while pkvals:
batch = pkvals[:2000]
pkvals = pkvals[2000:]
with conn:
with conn.cursor[] as cur:
cur.execute["UPDATE rides SET discounted = true, revenue = revenue*.9 WHERE id = ANY %s RETURNING id", [batch,]]
print[cur.statusmessage]
if not pkvals:
lastid = cur.fetchone[][0]
del batch
del pkvals
time.sleep[5]
conn.close[]
if __name__ == '__main__':
main[]
5Lưu ý rằng lần lặp cuối cùng của vòng lặp lồng nhau sẽ gán giá trị khóa chính của hàng cuối cùng được cập nhật cho biến
#!/usr/bin/env python3
import psycopg2
import os
import time
def main[]:
conn = psycopg2.connect[os.environ.get['DB_URI']]
lastid = None
while True:
with conn:
with conn.cursor[] as cur:
cur.execute["SET TRANSACTION AS OF SYSTEM TIME '-5s'"]
if lastid:
cur.execute["SELECT id FROM rides WHERE id > %s AND discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000", [lastid,]]
else:
cur.execute["SELECT id FROM rides WHERE discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000"]
pkvals = list[cur]
if not pkvals:
return
while pkvals:
batch = pkvals[:2000]
pkvals = pkvals[2000:]
with conn:
with conn.cursor[] as cur:
cur.execute["UPDATE rides SET discounted = true, revenue = revenue*.9 WHERE id = ANY %s RETURNING id", [batch,]]
print[cur.statusmessage]
if not pkvals:
lastid = cur.fetchone[][0]
del batch
del pkvals
time.sleep[5]
conn.close[]
if __name__ == '__main__':
main[]
54. Truy vấn #!/usr/bin/env python3
import psycopg2
import os
import time
def main[]:
conn = psycopg2.connect[os.environ.get['DB_URI']]
lastid = None
while True:
with conn:
with conn.cursor[] as cur:
cur.execute["SET TRANSACTION AS OF SYSTEM TIME '-5s'"]
if lastid:
cur.execute["SELECT id FROM rides WHERE id > %s AND discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000", [lastid,]]
else:
cur.execute["SELECT id FROM rides WHERE discounted != true AND extract['month', start_time] = 12 AND extract['day', start_time] > 23 ORDER BY id LIMIT 10000"]
pkvals = list[cur]
if not pkvals:
return
while pkvals:
batch = pkvals[:2000]
pkvals = pkvals[2000:]
with conn:
with conn.cursor[] as cur:
cur.execute["UPDATE rides SET discounted = true, revenue = revenue*.9 WHERE id = ANY %s RETURNING id", [batch,]]
print[cur.statusmessage]
if not pkvals:
lastid = cur.fetchone[][0]
del batch
del pkvals
time.sleep[5]
conn.close[]
if __name__ == '__main__':
main[]
7 tiếp theo sử dụng biến này để giảm số hàng được quét theo số hàng được cập nhật trong lần lặp cuối cùng của vòng lặp