• Przejdź do treści
  • Przejdź to drugiego menu
  • Przejdź do głównego paska bocznego
  • Przejdź do stopki
  • START
  • BLOG
  • NEWSLETTER
  • KIM JESTEM
  • KONTAKT
Cegładanych

Cegładanych

Dane - Databricks i Chmura Azura

  • Azure
  • Databricks
  • Spark
  • Etl
  • Engineering
  • AI

Tabele Delta jak działa płynne kastrowanie (Liquid Clustering)

23.11.2024 Krzysztof Nojman

liquid clustering

Partycjonowanie danych

Partycjonowanie powstało po to żeby posegregować dane. Jeśli masz milion plików i chcesz wyciągnąć konkretną informację, to chwilę będziesz musisz poczekać. I ta chwila może potrwać sporo czasu i do tego przepalisz sporo kasy. Tutaj wkracza partycjonowanie, czyli pogrupowanie danych według jakiegoś klucza.

Żeby dobrze dobrać partycję musisz wiedzieć jakie są twoje główne zapytania. Po jakich kluczach (kolumnach) najczęściej będziesz wyciągać dane. Być może pracujesz z danymi szeregów czasowych, więc partycjonujesz według roku, miesiąca i dnia. Może według klienta, a może czegoś innego. To zależy jak korzystasz z danych i co konkretnie będzie się znajdowało w zapytaniach. Dzięki partycjom Spark ma poukładane dane i wie gdzie ma szukać konkretnych informacji.

Musisz także zrozumieć, jak dane są fizycznie rozmieszczone i jakie mają wielkości. W przeciwnym razie może pojawić się „data skew„, oznacza to, że dane będą nierównomiernie rozmieszczone. Pamiętaj, że pracujesz w na danych rozproszonych więc jest to bardzo ważne żeby odpowiednio je poukładać. Partycje mogą się popsuć i w partycji nr 1 możesz mieć 400mb a w drugiej 700mb. To wpłynie na wydajność zapytań i to negatywnie😮

Tradycyjne partycjonowanie

Jeśli działasz w chmurze, a konkretnie w Databricks to powinieneś widzieć o kilku zasadach

  • Nie musisz partycjonować tabeli mniejszej niż 1TB.
  • W wersji runtime 11.2 partycjonowanie jest automatyczne po dacie zapisu danych („ingestion time”).

Kolejną rekomendacją jest wielkość samej partycji nie powinna być mniejsza niż 1GB. Ważne: jeśli robisz dużo modyfikacji przy użyciu UPDATE lub MERGE, Databricks zaleca uruchamianie OPTIMIZE przy użyciu kolumny zgodnej z kolejnością zapisu danych.

Hive partitioning

Działając w chmurze z narzędziami takimi jak Databricks czy Microsoft Fabric dane są przetrzymywane w formacie Delta, który to używa formatu Parquet do przechowywania danych. Apache Spark używa partycjonowania w stylu Hive podczas zapisywania danych w formacie Parquet. Partycjonowanie w stylu Hive nie jest częścią protokołu Delta Lake więc nie obsługuje tej strategii partycjonowania. Wiele funkcji Delta Lake łamie założenia dotyczące układu danych, które mogły zostać przesłane z protokołu Parquet, Hive lub nawet wcześniejszych wersji protokołu Delta Lake. Ważne, żeby przed wyborem sprawdzić jakie są opcje, i solidnie przetestować. Źle dobrane partycje mogą bardziej zaszkodzić niż pomóc.

Optimize i Z-Order

Dzięki indeksowi Z-order partycjonowanie działa jeszcze szybciej. Zmienia on kolejność danych w oparciu o konkretną kolumnę, przez co Spark wie gdzie są dane i nie musi czytać wszystkich plików (przykład poniżej). Jeśli te nowe Liquid Clustering działa lepiej to czeka cię migracja i z-order odejdzie do lamusa.

OPTIMIZE – uporządkowuje pliki w tabeli Delta zmieniając ich kolejność. Dzięki temu Spark nie traci czasu na przeglądanie danych co przyspiesza wykonywanie zapytań. Podczas tej operacji małe pliki są łączone w większe.

Pamiętaj, że ta operacja może trochę potrwać, to zależy od ilości danych oraz wielkości plików. Po szczegóły polecam artykuł Seequality. Na produkcji OPTYMIZE uruchamia się według jakiegoś harmonogramu czyli raz na jakiś czas, może to być po skończonym zasilania. Jak zwykle diabeł tkwi w szczegółach więc sprawdź zanim coś ustawisz.

Wspomnę, krótko o VACUUM, jest to operacja, która usuwa puste/popsute pliki. Dane są ciągle dodawane i przez to jest ich za dużo i niektóre stają się niepotrzebne. Ta operacje usunie zbędne pliki a tym samym przyspieszy procesy.

Płynne klastrowanie (Liquid Clastering)

Delta Lake Liquid Clustering zastępuje tradycyjne partycjonowanie i ZORDER żeby uprościć fizyczne rozmieszczenie danych i oczywiście przyspieszyć zapytania. Jak pisze Databricks w dokumentacji LC jest elastyczne jeśli chodzi o definiowanie kluczy klastrowania, bez konieczności przepisywania istniejących danych. Klucze które ustawiasz maja wpływ na to jak rozmieszczone są dane. Są one poukładane według kluczy czyli indexowane i posortowane.

Ich przewagą na indexami Hive czy Z-order jest brak konieczności przepisywania danych w momencie zmiany klucza.

Jest tutaj ważna uwaga – w najnowszych wersjach runtime > 14.2 dostępna jest funkcjonalność współbieżności (row-level concurrency). Podczas zapisu przez współbieżne operacje może dojść do konfliktu. W tym samym czasie różne procesy mogą zapisywać i odczytywać te same dane. Żeby zapobiec konfliktom możesz wybrać poziom izolacji i zapobiec awarii.

Konflikty

Poniższa tabele opisuje kiedy mogą wystąpić konflikty.

Konflikty zapisu z row-level concurency

INSERTUPDATE, DELETE, MERGE INTOOPTIMIZE
INSERTNie może być w konflikcie
UPDATE, DELETE, MERGE INTONie może powodować konfliktu w WriteSerializable. Może powodować konflikt w Serializable podczas modyfikowania tego samego wiersza; zobacz Ograniczenia współbieżności na poziomie wiersza .MERGE INTO wymaga Photona do rozwiązywania konfliktów na poziomie wiersza. Może powodować konflikt podczas modyfikowania tego samego wiersza; zobacz Ograniczenia współbieżności na poziomie wiersza .
OPTIMIZENie może być w konflikcieNie może być w konflikcieNie może być w konflikcie

Konflikty zapisu bez row-level concurency

INSERTUPDATE, DELETE, MERGE INTO OPTIMIZE
INSERTNie może być w konflikcie
UPDATE, DELETE, MERGE INTONie może powodować konfliktu w WriteSerializable. Może powodować konflikty w serializowalności; zobacz unikanie konfliktów z partycjami .Może powodować konflikty w Serializable i WriteSerializable; zobacz unikanie konfliktów z partycjami .
OPTIMIZENie może być w konflikcieNie może powodować konfliktów z tabelami z włączonymi wektorami usuwania. Inaczej może kolidować.Nie może powodować konfliktów z tabelami z włączonymi wektorami usuwania. Inaczej może kolidować.


Co warto wiedzieć o Liquid Clustering?

  • Potrzebujesz nowej wersji DBR runtime >13.3, aby wykonywać wszystkie operacje zapisu/odczytu.
  • Tylko do 4 kolumn można utworzyć klaster.
  • Tylko kolumny, dla których dostępne są statystyki, można sklastrować.
  • Nie wszystkie operacje DML (merge, etc.) klastrują dane podczas zapisu… dlatego często używaj OPTIMIZE.

Kiedy stosować

  • Tabele często filtrowane według kolumn o wysokiej kardynalności.
  • Tabele ze znaczną niesymetrycznością w dystrybucji danych (data skew).
  • Tabele, które szybko rosną i wymagają częstych optymalizacji.
  • Tabele z wymaganiami dotyczącymi współbieżnego zapisu.
  • Tabele ze wzorcami dostępu, które zmieniają się w czasie.
  • Tabele, w których typowy klucz partycji może pozostawić tabelę ze nieoptymalną ilością partycji, za dużo źle i za mało też.

Jak już włączysz ten typ klastrowania to musisz pamiętać o tym żeby regularnie wykonać OPTIMIZE

OPTIMIZE <table name>

Zawody Nowoczesny Liquid Clastering vs Tradycja

Postaram się sprawdzić w Databricks community edition jak to działa. Wiem że do solidnego testu potrzeba sporo danych, ale na razie muszę się posiłkować tym co mam. Skromnie, ale warto sprawdzić jakie będą osiągi przy tak małym klastrze. Podkreślam, zanim wrzucimy coś takiego na produkcję trzeba znacznie dokładniej przetestować.

Na odmianę użyłem innych danych niż te z Databricks, podkradłem je z AWS, wiem jestem nicpoń.😁😁 jest tam trochę danych więc warto korzystać. AWS Landsat to dane satelitarne, NASA ma swoje gadżety, które krążą nad naszą śliczną planetą i zbierają różne ciekawe dane. Jak już rozmawiamy o danych to AWS ma sporo danych i jeśli szukasz czegoś ciekawego do nauki to polecam przejrzeć.

Trochę kodu

Tworzę tabele

%sql
CREATE TABLE landsat_partitions
(
    entityld STRING,
    acquisitionDate TIMESTAMP,
    cloudCover FLOAT,
    processingLevel STRING,
    path INT,
    row INT,
    min_lat FLOAT,
    min_lon FLOAT,
    max_lat FLOAT,
    max_lon FLOAT,
    downioad_url STRING,
    year INT,
    month INT
)
USING DELTA
PARTITIONED BY (year,month);

CREATE TABLE landsat_clustering
(
  entityld STRING,
  acquisitionDate TIMESTAMP,
  cloudCover FLOAT,
  processingLevel STRING,
  path INT,
  row INT,
  min_lat FLOAT,
  min_lon FLOAT,
  max_lat FLOAT,
  max_lon FLOAT,
  downioad_url STRING,
  year INT,
  month INT
)
USING DELTA
CLUSTER BY (year,month);

CREATE TABLE landsat
(
    entityld STRING,
    acquisitionDate TIMESTAMP,
    cloudCover FLOAT,
    processingLevel STRING,
    path INT,
    row INT,
    min_lat FLOAT,
    min_lon FLOAT,
    max_lat FLOAT,
    max_lon FLOAT,
    downioad_url STRING,
    year INT ,
    month INT
)
USING DELTA;

Wczytuję dane

df = (spark.read.format("csv")
        .option("header" ,"true")
        .load("/FileStore/tables/Files/scene_list.gz"))

Trochę porządkowania danych

from  pyspark.sql.functions import col, to_timestamp, year, month

df = (df.withColumn("acquisitionDateStr",to_timestamp('acquisitionDate','yyyy-MM-dd HH:mm:ss.SSSSSS'))
      .withColumn("acquisitionDate",to_timestamp('acquisitionDateStr','yyyy-MM-dd HH:mm:ss'))
      .withColumn('year',year(col('acquisitionDate')))
      .withColumn('month',month(col('acquisitionDate')))
      .withColumn("cloudCover",col("cloudCover").cast("float"))
      .withColumn("path",col("path").cast("int"))
      .withColumn("row",col("row").cast("int"))
      .withColumn("min_lat",col("min_lat").cast("float"))
      .withColumn("min_lon",col("min_lon").cast("float"))
      .withColumn("max_lat",col("max_lat").cast("float"))
      .withColumn("max_lon",col("max_lon").cast("float"))
      .drop("acquisitionDateStr","acquisitionDateNew"))
df.createOrReplaceTempView("LandsatData")

Ładuję dane do tabel

# landsat_partitions landsat_clustering landsat
spark.sql("""INSERT INTO landsat_partitions
        SELECT
        entityId,
        acquisitionDate,
        cloudCover,
        processingLevel,
        path,
        row,
        min_lat,
        min_lon,
        max_lat,
        max_lon,
        download_url,
        year,
        month
        FROM LandsatData
""")

Przykładowe zapytanie

%sql
SELECT
  year,
  month,
  COUNT(*) AS total_images,
  AVG(cloudCover) AS average_cloudCover,
  MIN(cloudCover) AS min_cloudCover,
  MAX(cloudCover) AS max_cloudCover,
  processingLevel,
  ROUND(AVG(min_lat), 2) AS avg_min_lat,
  ROUND(AVG(min_lon), 2) AS avg_min_lon,
  ROUND(AVG(max_lat), 2) AS avg_max_lat,
  ROUND(AVG(max_lon), 2) AS avg_max_lon,
  COUNT(CASE WHEN cloudCover < 0.2 THEN 1 END) AS clear_images_count,
  MAX(max_lat) - MIN(min_lat) AS latitude_range,
  MAX(max_lon) - MIN(min_lon) AS longitude_range
FROM
  landsat_partitions
GROUP BY
  year,
  month,
  processingLevel
ORDER BY
  total_images DESC;
Liczba wierszyTradycyjne PartycjeLiquid clusteringDelta
1 Milion przed OPTYMIZE16.956.504.39
1 Milion z OPTYMIZE10.123.042.50
3 Miliony przed OPTYMIZE17.774.275.03
3 Miliony z OPTYMIZE6.612.712.83

Podsumowanie

Wyniki twoich testów mogą być inne, nie oczekuj że będą w 100% zgodne z moimi. Sporo można się z tego dowiedzieć. To jest dość wąski test, ale pokazuje, że LC jest dwa razy szybszy od tradycyjnych partycji. Warto zwrócić uwagę na Delta, bez specjalnych bajerów i tak śmiga nie gorzej od LC. Przy tak małej ilości danych czysta Delta wydaje się świetnie działać, i potwierdza słowa Databricks, że nie warto partycjonować małych tabel. Tego typu testy trzeba wykonać na znacznie większym zbiorze danych i kilku typach zapytań. Załączam notatnik żeby ułatwić sobie życie.

Dobra rada nie ufaj technologii i testuj wszystko co widzisz. Zanim zaczniemy coś stosować na produkcji dobrze wiedzieć jak działa dana technologia, żeby nie strzelić sobie w stopę 😀

Notatnik

W kategorii:Spark

Big Data ebook
Subskrybuj
Powiadom o
guest

guest

0 Komentarze
Najstarsze
Najnowsze Najwięcej głosów
Opinie w linii
Zobacz wszystkie komentarze

Pierwszy panel boczny

O MNIE

Narzędzia i dobre procesy do przetwarzania danych to podstawa sukcesu i wartości dla firmy. Czytaj więcej…

big data ebook

Ostatnie wpisy

spark joins

Jak Spark robi join?

13.01.2025 By Krzysztof Nojman

Czy JSON to samo zło

04.01.2025 By Krzysztof Nojman

VS Code nowości AI 

09.12.2024 By Krzysztof Nojman

Linki społecznościowe

  • Facebook
  • GitHub
  • LinkedIn
  • YouTube

Wyszukiwanie

Footer

Najnowsze wpisy

  • Jakość danych w Databricks DQX
  • Jak Spark robi join?
  • Czy JSON to samo zło
  • VS Code nowości AI 
  • Lista narzędzi AI dla każdego inżyniera, które warto znać
  • Kilka pomysłów na konfigurację Databricks
  • Co pamięta wykonawca (executor🧠)

Tagi

AI Apache Spark Architektura Azure BIg Data Certyfikat cloud Databricks Data Factory Dataframe DQX ETL Hurtownia Danych Intellij IoT Jaka technologia Join Kod Konfiguracja lakehouse Narzędzia Optymalizacja pyspark Spark Windows 10 zadania

Informacje Prawne

To jest nudna część lecz wymagana, wszystkie notki prawne o stronie znajdziecie tutaj.

Polityka Prywatności

Regulamin

Copyright © 2025 · Wszelkie prawa zastrzeżone. Krzysztof Nojman

wpDiscuz