• Przejdź do treści
  • Przejdź to drugiego menu
  • Przejdź do głównego paska bocznego
  • Przejdź do stopki
  • START
  • BLOG
  • NEWSLETTER
  • KIM JESTEM
  • KONTAKT
Cegładanych

Cegładanych

Dane - Databricks i Chmura Azura

  • Azure
  • Databricks
  • Spark
  • Etl
  • Engineering
  • AI

Jak walidować schemat danych w Apache Spark

23.11.2024 Krzysztof Nojman

walidacja schematu danych

Walidacja schematu danych jest bardzo ważnym etapem, w każdym projekcie z danymi. Jest to klucz do sukcesu i należy go potraktować poważnie. Poniżej znajdziesz przykłady jak walidować schemat danych i jakie masz dostępne narzędzia w Apache Spark i Databricks. Oczywiście możesz zrobić znacznie więcej dla jakości danych, ale to są podstawy dla pierwszej wersji twojego rozwiązania.

Co to jest schemat danych

Schemat danych opisuje każdy detal dotyczący danych. Są to tzw. metadane. Jest to struktura danych opisująca ich charakterystykę. Pozwolę sobie przedstawić to na przykładzie. Prosta struktura danych jest reprezentowana przez plik csv. lub Excel. Są tam kolumny, gdzie każda reprezentuje pojedynczy typ danych. Dla każdej kolumny możemy wyszczególnić jej nazwę i typ. W zależności od rodzaju pliku możesz podać więcej atrybutów.

MarkaTypData ProdukcjiModel
MazdaKombi15.04.20166

Schemat danych dla powyższego pliku jest następujący.

MarkaString
TypString
Data ProdukcjiDate
ModelInt

W tym najprostszym przykładzie wiesz jakie typy danych mogą się znajdować w poszczególnej kolumnie. Jeśli w kolumnie Model, ktoś lub coś doda jakąś wartość o typie String, to podczas walidacji Spark pokaże wyjątek. W tym wypadku złe nie dostaną się na produkcję.

Kolejnym elementem, który możesz sprawdzić są nulle. Null oznacza brak jakichkolwiek wartości w poszczególnej kolumnie.

Podsumowując schemat danych, jest opisem tego jakie parametry powinny charakteryzować zestaw danych. Każda kolumna powinna dopuszczać konkretne dane o sprecyzowanych wartościach.

Po co walidować schemat danych

Odpowiedź jest prosta, ponieważ jakość jest bardzo ważna. Bardzo lubię produkty premium za ich solidność i niezawodność. Jakość jest tak ważna, że trzeba o nią walczyć. Sam mam alergię na tandetę 😁 i to nie tylko dotyczy produktów, używanych na co dzień takich, które możesz wziąć do ręki, ale i usług.

Za wszelką cenę nie możesz dopuścić złych danych na produkcję !!!! Myślę, że się domyślasz, jakie będą tego konsekwencje. Bez względu na skalę problemu zawsze będą negatywne dla firmy i dla Ciebie. Kadra zarządzająca będzie podejmować złe decyzje, a to może mieć bardzo poważne konsekwencje. Na dodatek będziesz siedzieć po godzinach i naprawiać te błędy.

Typy danych

Wszystkie dane jakie przechowujemy i przetwarzamy mają swoją specyficzną charakterystykę.

Typ DanychOpis
ByteTypeReprezentuje 1-bajtowe liczby całkowite ze znakiem. Zakres liczb wynosi od -128 do 127.
ShortTypeReprezentuje 2-bajtowe liczby całkowite ze znakiem. Zakres liczb wynosi od -32768 do 32767
IntegerTypeReprezentuje 4-bajtowe liczby całkowite ze znakiem. Zakres liczb wynosi od -2147483648 do 2147483647
LongTypeReprezentuje 8-bajtowe liczby całkowite ze znakiem. Zakres liczb wynosi od -9223372036854775808 do 9223372036854775807
FloatTypeReprezentuje 4-bajtowe liczby zmiennoprzecinkowe o pojedynczej precyzji.
DoubleTypeReprezentuje 8-bajtowe liczby zmiennoprzecinkowe o podwójnej precyzji.
DecimalTypeReprezentuje liczby dziesiętne ze znakiem o dowolnej precyzji. Wspierany wewnętrznie przez java.math.BigDecimal. A 
BigDecimalskłada się z nieskalowanej wartości całkowitej o dowolnej precyzji i 32-bitowej skali całkowitej.
StringTypeReprezentuje wartości ciągu znaków. „Hello World”
VarcharTypeMa ograniczenie długości. Zapisywanie danych nie powiedzie się, jeśli wprowadzony ciąg znaków przekroczy limit długości.
BinaryTypeReprezentuje wartości sekwencji bajtów.
BooleanTypeReprezentuje wartości logiczne True False.
TimestampTypeReprezentuje wartości składające się z pól rok, miesiąc, dzień, godzina, minuta i sekunda, z lokalną strefą czasową sesji. 
Wartość znacznika czasu reprezentuje bezwzględny punkt w czasie. 2021-09-01 12:34:56
DateTypeReprezentuje wartości zawierające wartości pól rok, miesiąc i dzień, bez strefy czasowej. 2021-09-01
ArrayTypeReprezentuje wartości składające się na sekwencję elementów typu elementType. containsNullsłuży do wskazania, czy elementy w 
ArrayTypewartości mogą mieć nullwartości. [1, 2, 3, 4, 5]
MapTypeReprezentuje wartości składające się z zestawu par klucz-wartość. {„key1”: „value1”, „key2”: „value2”}
StructTypeReprezentuje wartości o strukturze opisanej przez sekwencję StructFields (fields). {„name”: „John”, „age”: 30, „city”: „New York”}

Jakie mogą być konsekwencję złego doboru typu danych. W najprostszym przykładzie wpłynie to na osiągi oraz koszty magazynu danych. Poniżej tabelka porównująca ile danych zajmie 100 000 000 rzędów danych.

Liczba rzędówTypRozmiar
100 000 000Integer190.73 GB
100 000 000Long381.47 GB

Jak widzisz różnica jest znaczna, a to tylko jedna kolumna, jeśli źle dobierzesz typ danych możesz drogo za to zapłacić.

Kiedy walidować dane

Najczęściej możesz zastosować dwa modele walidacji on Read i on Write.

On Read

Jeśli zaczynasz przetwarzać dane i wiesz, że nie są najlepszej jakości, to niewątpliwie powinieneś je walidować na samym początku. Później, kiedy już zaczniesz przetwarzać, może być za późno na wyłapanie błędów. Poniżej przykład walidacji schematu pliku csv. Produkcyjnie pliki schematu trzymamy w repo i ładujemy np. przy pomocy narzędzi DevOps.

DataFrameReader

from pyspark.sql.types import StructType, StringType, IntegerType, StructField
from pyspark.sql.functions import *

filePath = "dbfs:/FileStore/tables/Files/names.csv"

schemat = (StructType([
    StructField("id", StringType(), False),
    StructField("name", StringType(), False),
    StructField("birth name", StringType(), False)            
]))
# CSV
namesDf = (spark.read.format("csv")
          .option("header", "true")
          .schema(schemat)
          .load(filePath))
# JSON
names = (spark.read.format("json") 
    .option("multiline","true")
    .option("schema", schemat)
    .load("dbfs:/FileStore/tables/file.json")
    .withColumnRenamed("birth name","birth_name")
    .withColumn("IdNum",monotonically_increasing_id()))

Bezpieczne ładowanie danych

Spark daje Ci możliwość pobrania danych z wielu źródeł tutaj przykłady najpopularniejszych formatów.

  • CSV
  • JSON
  • Parquet
  • ORC
  • Połączenia JDBC / ODBC
  • Zwykłe pliki tekstowe

W pracy z danymi ważne jest, żeby złe dane nie przedostały się ze źródła do celu. Jeśli chodzi o Apache Spark, to jest kilka metod dostępnych oprócz walidacji schematów. Są to tzw Read Modes. Pozwalają na obsłużenie błędów podczas odczytu danych i stanowią dodatkową weryfikację.

Typy

PERMISSIVE: Jeśli atrybuty nie mogą zostać wczytane Spark zamienia je na nule

DROPMALFORMED: wiersze są usuwane

FAILFAST: proces odczytu zostaje całkowicie zatrzymany

Na produkcji powinniśmy przekierować dane do osobnej ścieżki, gdzie zbudujemy osobny mechanizm przeładowanie błędnych danych. Taka kwarantanna dla danych.

df = (spark.read.format('json')
    .option('schema', schemat)
    .option('badRecordsPath', '/mnt/source/badrecords')
    .load('dbfs:/FileStore/tables/file.json'))

df = (spark.read.format("json")
    .option('schema', schemat)
    .option ('mode','PERMISSIVE')
    .load('dbfs:/FileStore/tables/file.json'))

df = (spark.read.format('json')
    .option ('schema' , schemat)
    .option ('mode','DROPMALFORMED')
    .load('dbfs:/FileStore/tables/file.json'))

df = (spark.read.format('json')
    .option('schema', schemat)
    .option('mode','FAILFAST')
    .load('dbfs:/FileStore/tables/file.json'))

On Write

DataFrameWriter

Drugim etapem walidacji może być sprawdzenie schematu przy zapisie. To gwarantuje, że twoje transformacje działają, jak należy. Jeśli już skończyłeś przetwarzać dane, to powinieneś sprawdzić czy spełniają twoje wstępne wymagania. I do tego możesz sprawdzić schemat podczas zapisu.

Kiedy użyć odpowiednej metody?

To zależy od tego, co robisz. Dane możesz walidować na samym początku i/lub na końcu. Optymalnie powinieneś to robić na obu etapach procesu. Są od tego wyjątki. Jeśli pobierasz dane z produkcji, to oznacza, że są one dobrej jakości, a więc nie koniecznie musisz je walidować przy odczycie. Możesz ograniczyć się do walidacji Dataframe przy zapisie. Wszystko zależy od tego jaki masz cel i jakie są twoje wymagania. Użyj odpowiedniej metody do problemu.

Tutaj oprócz walidacji możesz użyć ’save modes’

  • append: dodawanie danych do istniejących (Uwaga może dojść do duplikatów).
  • overwrite: Nadpisanie danych (Uwaga stracisz co było do tej pory).
  • errorifexists: Jeśli dane istnieją w danej ścieżce to pojawi się wyjątek.
  • ignore: Jeśli dane istnieją to nic się nie wydarzy.

Poniżej przykład z walidacją schematu avro podczas zapisu.

schema_str = '''
{
  "type": "record",
  "name": "avro_schema",
  "namespace": "avro",
  "fields": [
    {
      "name": "birth_name",
      "type": "string",
      "doc": "Nazwisko rodowe"
    },
    {
      "name": "id",
      "type": "string",
      "doc": "Kolumna z numerem referencyjnym"
    },
    {
      "name": "name",
      "type": "string",
      "doc": "Nazwisko"
    },
    {
      "name": "IdNum",
      "type": "long",
      "doc": "jakaś kolumna id"
    }
  ],
  "doc:": "Prosty schemat do testów"
}
'''

(names.write
  .format("avro")
  .option("avroSchema",schema_str)
  .mode("overwrite")
  .save('dbfs:/FileStore/tables/dane'))

Walidacja danych w Delta Lake

Jeśli pracujesz w chmurze (Azure, AWS lub GCP) to masz dostęp do mechanizmu zapewniającego walidację schematu podczas zapisu, co oznacza, że wszystkie dane w tabeli są sprawdzane pod kątem zgodności ze schematem tabeli docelowej.

Delta Lake

Delta Lake to zoptymalizowana warstwa magazynu, która stanowi podstawę do przechowywania danych i tabel na platformie Databricks. Odrobina kodu open source, ale czyni cuda i bardzo pomaga w pracy. Jest w pełni spójna z API Sparka i działa z operacjami typu batch i stream.

Zalety Delta Lake:

  • Transakcje ACID
  • Skalowalne metadane – radzi sobie z dużymi ilościami danych PB
  • Podróż w czasie – możesz odczytać historyczne wersje tabeli (rollback, audit, reproduce data)
  • Open Source
  • Działa z batch i stream
  • Walidacje schematu danych – radzi sobie ze zmianami schematu
  • Pełny audyt
  • Działa z API Scala, Python, SQL

Optymistyczna kontrola współbieżności

Usługa Delta Lake używa optymistycznej kontroli współbieżności, która zapewnić gwarancje transakcyjne między zapisami. W ramach tego mechanizmu zapisy działają na trzech etapach:
Odczyt: Odczytuje (w razie potrzeby) najnowszą dostępną wersję tabeli, i określa, które pliki należy zmodyfikować.
Zapis: etapy wszystkich zmian przez zapisanie nowych plików danych.
Zweryfikuj i zatwierdź: przed zatwierdzeniem zmian sprawdza, czy proponowane zmiany powodują konflikt z wszelkimi innymi zmianami. Jeśli nie ma konfliktów, wszystkie przygotowane zmiany są zatwierdzane jako nowa migawka w danej wersji. W rezultacie operacja zapisu zakończy się pomyślnie. Jeśli jednak występują konflikty, operacja zapisu kończy się niepowodzeniem z wyjątkiem współbieżnej modyfikacji, a nie uszkodzeniem tabeli, tak jak w przypadku operacji zapisu w tabeli Parquet.

Databricks sprawdzi następujące elementy:

  • Dataframe nie może zawierać więcej kolumn niż jest w tabeli docelowej.
  • Typy danych w poszczególnych kolumnach muszą pasować do tabeli docelowej.
  • Nazwy kolumn muszą pasować nawet wielkość liter jest sprawdzana.

Dzięki temu mechanizmowi masz dodatkową gwarancję, że typy danych będą pasować do istniejącego modelu. Wszelkie niezgodności wywołają wyjątek i Spark nie zapisze danych.

Nie martw się, Delta Lake jest tak wypasiona, że umożliwia aktualizację schematu tabeli. Obsługiwane są następujące typy zmian:

  • Dodawanie nowych kolumn (w dowolnych pozycjach)
  • Zmiana kolejności istniejących kolumn
  • Zmiana nazw istniejących kolumn
(spark.read.table(...)
  .withColumn("birthDate", col("birthDate").cast("date"))
  .write
  .mode("overwrite")
  .option("overwriteSchema", "true")
  .saveAsTable(...)
)

Przykładowy notatnik

Walidacja schematu danych

Daj znać czy to Ci pomogło a może o czymś zapomniałem?

W kategorii:Spark

Big Data ebook
Subskrybuj
Powiadom o
guest

guest

0 Komentarze
Najstarsze
Najnowsze Najwięcej głosów
Opinie w linii
Zobacz wszystkie komentarze

Pierwszy panel boczny

O MNIE

Narzędzia i dobre procesy do przetwarzania danych to podstawa sukcesu i wartości dla firmy. Czytaj więcej…

big data ebook

Ostatnie wpisy

spark joins

Jak Spark robi join?

13.01.2025 By Krzysztof Nojman

Czy JSON to samo zło

04.01.2025 By Krzysztof Nojman

VS Code nowości AI 

09.12.2024 By Krzysztof Nojman

Linki społecznościowe

  • Facebook
  • GitHub
  • LinkedIn
  • YouTube

Wyszukiwanie

Footer

Najnowsze wpisy

  • Jakość danych w Databricks DQX
  • Jak Spark robi join?
  • Czy JSON to samo zło
  • VS Code nowości AI 
  • Lista narzędzi AI dla każdego inżyniera, które warto znać
  • Kilka pomysłów na konfigurację Databricks
  • Co pamięta wykonawca (executor🧠)

Tagi

AI Apache Spark Architektura Azure BIg Data Certyfikat cloud Databricks Data Factory Dataframe DQX ETL Hurtownia Danych Intellij IoT Jaka technologia Join Kod Konfiguracja lakehouse Narzędzia Optymalizacja pyspark Spark Windows 10 zadania

Informacje Prawne

To jest nudna część lecz wymagana, wszystkie notki prawne o stronie znajdziecie tutaj.

Polityka Prywatności

Regulamin

Copyright © 2025 · Wszelkie prawa zastrzeżone. Krzysztof Nojman

wpDiscuz