Partycjonowanie danych
Partycjonowanie powstało po to żeby posegregować dane. Jeśli masz milion plików i chcesz wyciągnąć konkretną informację, to chwilę będziesz musisz poczekać. I ta chwila może potrwać sporo czasu i do tego przepalisz sporo kasy. Tutaj wkracza partycjonowanie, czyli pogrupowanie danych według jakiegoś klucza.
Żeby dobrze dobrać partycję musisz wiedzieć jakie są twoje główne zapytania. Po jakich kluczach (kolumnach) najczęściej będziesz wyciągać dane. Być może pracujesz z danymi szeregów czasowych, więc partycjonujesz według roku, miesiąca i dnia. Może według klienta, a może czegoś innego. To zależy jak korzystasz z danych i co konkretnie będzie się znajdowało w zapytaniach. Dzięki partycjom Spark ma poukładane dane i wie gdzie ma szukać konkretnych informacji.
Musisz także zrozumieć, jak dane są fizycznie rozmieszczone i jakie mają wielkości. W przeciwnym razie może pojawić się „data skew„, oznacza to, że dane będą nierównomiernie rozmieszczone. Pamiętaj, że pracujesz w na danych rozproszonych więc jest to bardzo ważne żeby odpowiednio je poukładać. Partycje mogą się popsuć i w partycji nr 1 możesz mieć 400mb a w drugiej 700mb. To wpłynie na wydajność zapytań i to negatywnie😮
Tradycyjne partycjonowanie
Jeśli działasz w chmurze, a konkretnie w Databricks to powinieneś widzieć o kilku zasadach
- Nie musisz partycjonować tabeli mniejszej niż 1TB.
- W wersji runtime 11.2 partycjonowanie jest automatyczne po dacie zapisu danych („ingestion time”).
Kolejną rekomendacją jest wielkość samej partycji nie powinna być mniejsza niż 1GB. Ważne: jeśli robisz dużo modyfikacji przy użyciu UPDATE lub MERGE, Databricks zaleca uruchamianie OPTIMIZE przy użyciu kolumny zgodnej z kolejnością zapisu danych.
Hive partitioning
Działając w chmurze z narzędziami takimi jak Databricks czy Microsoft Fabric dane są przetrzymywane w formacie Delta, który to używa formatu Parquet do przechowywania danych. Apache Spark używa partycjonowania w stylu Hive podczas zapisywania danych w formacie Parquet. Partycjonowanie w stylu Hive nie jest częścią protokołu Delta Lake więc nie obsługuje tej strategii partycjonowania. Wiele funkcji Delta Lake łamie założenia dotyczące układu danych, które mogły zostać przesłane z protokołu Parquet, Hive lub nawet wcześniejszych wersji protokołu Delta Lake. Ważne, żeby przed wyborem sprawdzić jakie są opcje, i solidnie przetestować. Źle dobrane partycje mogą bardziej zaszkodzić niż pomóc.
Optimize i Z-Order
Dzięki indeksowi Z-order partycjonowanie działa jeszcze szybciej. Zmienia on kolejność danych w oparciu o konkretną kolumnę, przez co Spark wie gdzie są dane i nie musi czytać wszystkich plików (przykład poniżej). Jeśli te nowe Liquid Clustering działa lepiej to czeka cię migracja i z-order odejdzie do lamusa.

OPTIMIZE – uporządkowuje pliki w tabeli Delta zmieniając ich kolejność. Dzięki temu Spark nie traci czasu na przeglądanie danych co przyspiesza wykonywanie zapytań. Podczas tej operacji małe pliki są łączone w większe.
Pamiętaj, że ta operacja może trochę potrwać, to zależy od ilości danych oraz wielkości plików. Po szczegóły polecam artykuł Seequality. Na produkcji OPTYMIZE uruchamia się według jakiegoś harmonogramu czyli raz na jakiś czas, może to być po skończonym zasilania. Jak zwykle diabeł tkwi w szczegółach więc sprawdź zanim coś ustawisz.
Wspomnę, krótko o VACUUM, jest to operacja, która usuwa puste/popsute pliki. Dane są ciągle dodawane i przez to jest ich za dużo i niektóre stają się niepotrzebne. Ta operacje usunie zbędne pliki a tym samym przyspieszy procesy.
Płynne klastrowanie (Liquid Clastering)
Delta Lake Liquid Clustering zastępuje tradycyjne partycjonowanie i ZORDER żeby uprościć fizyczne rozmieszczenie danych i oczywiście przyspieszyć zapytania. Jak pisze Databricks w dokumentacji LC jest elastyczne jeśli chodzi o definiowanie kluczy klastrowania, bez konieczności przepisywania istniejących danych. Klucze które ustawiasz maja wpływ na to jak rozmieszczone są dane. Są one poukładane według kluczy czyli indexowane i posortowane.
Ich przewagą na indexami Hive czy Z-order jest brak konieczności przepisywania danych w momencie zmiany klucza.
Jest tutaj ważna uwaga – w najnowszych wersjach runtime > 14.2 dostępna jest funkcjonalność współbieżności (row-level concurrency). Podczas zapisu przez współbieżne operacje może dojść do konfliktu. W tym samym czasie różne procesy mogą zapisywać i odczytywać te same dane. Żeby zapobiec konfliktom możesz wybrać poziom izolacji i zapobiec awarii.
Konflikty
Poniższa tabele opisuje kiedy mogą wystąpić konflikty.
Konflikty zapisu z row-level concurency
| INSERT | UPDATE, DELETE, MERGE INTO | OPTIMIZE | |
|---|---|---|---|
| INSERT | Nie może być w konflikcie | ||
| UPDATE, DELETE, MERGE INTO | Nie może powodować konfliktu w WriteSerializable. Może powodować konflikt w Serializable podczas modyfikowania tego samego wiersza; zobacz Ograniczenia współbieżności na poziomie wiersza . | MERGE INTO wymaga Photona do rozwiązywania konfliktów na poziomie wiersza. Może powodować konflikt podczas modyfikowania tego samego wiersza; zobacz Ograniczenia współbieżności na poziomie wiersza . | |
| OPTIMIZE | Nie może być w konflikcie | Nie może być w konflikcie | Nie może być w konflikcie |
Konflikty zapisu bez row-level concurency
| INSERT | UPDATE, DELETE, MERGE INTO | OPTIMIZE | |
|---|---|---|---|
| INSERT | Nie może być w konflikcie | ||
| UPDATE, DELETE, MERGE INTO | Nie może powodować konfliktu w WriteSerializable. Może powodować konflikty w serializowalności; zobacz unikanie konfliktów z partycjami . | Może powodować konflikty w Serializable i WriteSerializable; zobacz unikanie konfliktów z partycjami . | |
| OPTIMIZE | Nie może być w konflikcie | Nie może powodować konfliktów z tabelami z włączonymi wektorami usuwania. Inaczej może kolidować. | Nie może powodować konfliktów z tabelami z włączonymi wektorami usuwania. Inaczej może kolidować. |
Co warto wiedzieć o Liquid Clustering?
- Potrzebujesz nowej wersji DBR runtime >13.3, aby wykonywać wszystkie operacje zapisu/odczytu.
- Tylko do 4 kolumn można utworzyć klaster.
- Tylko kolumny, dla których dostępne są statystyki, można sklastrować.
- Nie wszystkie operacje DML (merge, etc.) klastrują dane podczas zapisu… dlatego często używaj OPTIMIZE.
Kiedy stosować
- Tabele często filtrowane według kolumn o wysokiej kardynalności.
- Tabele ze znaczną niesymetrycznością w dystrybucji danych (data skew).
- Tabele, które szybko rosną i wymagają częstych optymalizacji.
- Tabele z wymaganiami dotyczącymi współbieżnego zapisu.
- Tabele ze wzorcami dostępu, które zmieniają się w czasie.
- Tabele, w których typowy klucz partycji może pozostawić tabelę ze nieoptymalną ilością partycji, za dużo źle i za mało też.
Jak już włączysz ten typ klastrowania to musisz pamiętać o tym żeby regularnie wykonać OPTIMIZE
OPTIMIZE <table name>
Zawody Nowoczesny Liquid Clastering vs Tradycja
Postaram się sprawdzić w Databricks community edition jak to działa. Wiem że do solidnego testu potrzeba sporo danych, ale na razie muszę się posiłkować tym co mam. Skromnie, ale warto sprawdzić jakie będą osiągi przy tak małym klastrze. Podkreślam, zanim wrzucimy coś takiego na produkcję trzeba znacznie dokładniej przetestować.
Na odmianę użyłem innych danych niż te z Databricks, podkradłem je z AWS, wiem jestem nicpoń.😁😁 jest tam trochę danych więc warto korzystać. AWS Landsat to dane satelitarne, NASA ma swoje gadżety, które krążą nad naszą śliczną planetą i zbierają różne ciekawe dane. Jak już rozmawiamy o danych to AWS ma sporo danych i jeśli szukasz czegoś ciekawego do nauki to polecam przejrzeć.
Trochę kodu
Tworzę tabele
%sql
CREATE TABLE landsat_partitions
(
entityld STRING,
acquisitionDate TIMESTAMP,
cloudCover FLOAT,
processingLevel STRING,
path INT,
row INT,
min_lat FLOAT,
min_lon FLOAT,
max_lat FLOAT,
max_lon FLOAT,
downioad_url STRING,
year INT,
month INT
)
USING DELTA
PARTITIONED BY (year,month);
CREATE TABLE landsat_clustering
(
entityld STRING,
acquisitionDate TIMESTAMP,
cloudCover FLOAT,
processingLevel STRING,
path INT,
row INT,
min_lat FLOAT,
min_lon FLOAT,
max_lat FLOAT,
max_lon FLOAT,
downioad_url STRING,
year INT,
month INT
)
USING DELTA
CLUSTER BY (year,month);
CREATE TABLE landsat
(
entityld STRING,
acquisitionDate TIMESTAMP,
cloudCover FLOAT,
processingLevel STRING,
path INT,
row INT,
min_lat FLOAT,
min_lon FLOAT,
max_lat FLOAT,
max_lon FLOAT,
downioad_url STRING,
year INT ,
month INT
)
USING DELTA;
Wczytuję dane
df = (spark.read.format("csv")
.option("header" ,"true")
.load("/FileStore/tables/Files/scene_list.gz"))
Trochę porządkowania danych
from pyspark.sql.functions import col, to_timestamp, year, month
df = (df.withColumn("acquisitionDateStr",to_timestamp('acquisitionDate','yyyy-MM-dd HH:mm:ss.SSSSSS'))
.withColumn("acquisitionDate",to_timestamp('acquisitionDateStr','yyyy-MM-dd HH:mm:ss'))
.withColumn('year',year(col('acquisitionDate')))
.withColumn('month',month(col('acquisitionDate')))
.withColumn("cloudCover",col("cloudCover").cast("float"))
.withColumn("path",col("path").cast("int"))
.withColumn("row",col("row").cast("int"))
.withColumn("min_lat",col("min_lat").cast("float"))
.withColumn("min_lon",col("min_lon").cast("float"))
.withColumn("max_lat",col("max_lat").cast("float"))
.withColumn("max_lon",col("max_lon").cast("float"))
.drop("acquisitionDateStr","acquisitionDateNew"))
df.createOrReplaceTempView("LandsatData")
Ładuję dane do tabel
# landsat_partitions landsat_clustering landsat
spark.sql("""INSERT INTO landsat_partitions
SELECT
entityId,
acquisitionDate,
cloudCover,
processingLevel,
path,
row,
min_lat,
min_lon,
max_lat,
max_lon,
download_url,
year,
month
FROM LandsatData
""")
Przykładowe zapytanie
%sql
SELECT
year,
month,
COUNT(*) AS total_images,
AVG(cloudCover) AS average_cloudCover,
MIN(cloudCover) AS min_cloudCover,
MAX(cloudCover) AS max_cloudCover,
processingLevel,
ROUND(AVG(min_lat), 2) AS avg_min_lat,
ROUND(AVG(min_lon), 2) AS avg_min_lon,
ROUND(AVG(max_lat), 2) AS avg_max_lat,
ROUND(AVG(max_lon), 2) AS avg_max_lon,
COUNT(CASE WHEN cloudCover < 0.2 THEN 1 END) AS clear_images_count,
MAX(max_lat) - MIN(min_lat) AS latitude_range,
MAX(max_lon) - MIN(min_lon) AS longitude_range
FROM
landsat_partitions
GROUP BY
year,
month,
processingLevel
ORDER BY
total_images DESC;
| Liczba wierszy | Tradycyjne Partycje | Liquid clustering | Delta |
|---|---|---|---|
| 1 Milion przed OPTYMIZE | 16.95 | 6.50 | 4.39 |
| 1 Milion z OPTYMIZE | 10.12 | 3.04 | 2.50 |
| 3 Miliony przed OPTYMIZE | 17.77 | 4.27 | 5.03 |
| 3 Miliony z OPTYMIZE | 6.61 | 2.71 | 2.83 |
Podsumowanie
Wyniki twoich testów mogą być inne, nie oczekuj że będą w 100% zgodne z moimi. Sporo można się z tego dowiedzieć. To jest dość wąski test, ale pokazuje, że LC jest dwa razy szybszy od tradycyjnych partycji. Warto zwrócić uwagę na Delta, bez specjalnych bajerów i tak śmiga nie gorzej od LC. Przy tak małej ilości danych czysta Delta wydaje się świetnie działać, i potwierdza słowa Databricks, że nie warto partycjonować małych tabel. Tego typu testy trzeba wykonać na znacznie większym zbiorze danych i kilku typach zapytań. Załączam notatnik żeby ułatwić sobie życie.
Dobra rada nie ufaj technologii i testuj wszystko co widzisz. Zanim zaczniemy coś stosować na produkcji dobrze wiedzieć jak działa dana technologia, żeby nie strzelić sobie w stopę 😀
