Walidacja schematu danych jest bardzo ważnym etapem, w każdym projekcie z danymi. Jest to klucz do sukcesu i należy go potraktować poważnie. Poniżej znajdziesz przykłady jak walidować schemat danych i jakie masz dostępne narzędzia w Apache Spark i Databricks. Oczywiście możesz zrobić znacznie więcej dla jakości danych, ale to są podstawy dla pierwszej wersji twojego rozwiązania.
Co to jest schemat danych
Schemat danych opisuje każdy detal dotyczący danych. Są to tzw. metadane. Jest to struktura danych opisująca ich charakterystykę. Pozwolę sobie przedstawić to na przykładzie. Prosta struktura danych jest reprezentowana przez plik csv. lub Excel. Są tam kolumny, gdzie każda reprezentuje pojedynczy typ danych. Dla każdej kolumny możemy wyszczególnić jej nazwę i typ. W zależności od rodzaju pliku możesz podać więcej atrybutów.
| Marka | Typ | Data Produkcji | Model |
| Mazda | Kombi | 15.04.2016 | 6 |
Schemat danych dla powyższego pliku jest następujący.
| Marka | String |
| Typ | String |
| Data Produkcji | Date |
| Model | Int |
W tym najprostszym przykładzie wiesz jakie typy danych mogą się znajdować w poszczególnej kolumnie. Jeśli w kolumnie Model, ktoś lub coś doda jakąś wartość o typie String, to podczas walidacji Spark pokaże wyjątek. W tym wypadku złe nie dostaną się na produkcję.
Kolejnym elementem, który możesz sprawdzić są nulle. Null oznacza brak jakichkolwiek wartości w poszczególnej kolumnie.
Podsumowując schemat danych, jest opisem tego jakie parametry powinny charakteryzować zestaw danych. Każda kolumna powinna dopuszczać konkretne dane o sprecyzowanych wartościach.
Po co walidować schemat danych
Odpowiedź jest prosta, ponieważ jakość jest bardzo ważna. Bardzo lubię produkty premium za ich solidność i niezawodność. Jakość jest tak ważna, że trzeba o nią walczyć. Sam mam alergię na tandetę 😁 i to nie tylko dotyczy produktów, używanych na co dzień takich, które możesz wziąć do ręki, ale i usług.
Za wszelką cenę nie możesz dopuścić złych danych na produkcję !!!! Myślę, że się domyślasz, jakie będą tego konsekwencje. Bez względu na skalę problemu zawsze będą negatywne dla firmy i dla Ciebie. Kadra zarządzająca będzie podejmować złe decyzje, a to może mieć bardzo poważne konsekwencje. Na dodatek będziesz siedzieć po godzinach i naprawiać te błędy.
Typy danych
Wszystkie dane jakie przechowujemy i przetwarzamy mają swoją specyficzną charakterystykę.
| Typ Danych | Opis |
| ByteType | Reprezentuje 1-bajtowe liczby całkowite ze znakiem. Zakres liczb wynosi od -128 do 127. |
| ShortType | Reprezentuje 2-bajtowe liczby całkowite ze znakiem. Zakres liczb wynosi od -32768 do 32767 |
| IntegerType | Reprezentuje 4-bajtowe liczby całkowite ze znakiem. Zakres liczb wynosi od -2147483648 do 2147483647 |
| LongType | Reprezentuje 8-bajtowe liczby całkowite ze znakiem. Zakres liczb wynosi od -9223372036854775808 do 9223372036854775807 |
| FloatType | Reprezentuje 4-bajtowe liczby zmiennoprzecinkowe o pojedynczej precyzji. |
| DoubleType | Reprezentuje 8-bajtowe liczby zmiennoprzecinkowe o podwójnej precyzji. |
| DecimalType | Reprezentuje liczby dziesiętne ze znakiem o dowolnej precyzji. Wspierany wewnętrznie przez java.math.BigDecimal. A BigDecimalskłada się z nieskalowanej wartości całkowitej o dowolnej precyzji i 32-bitowej skali całkowitej. |
| StringType | Reprezentuje wartości ciągu znaków. „Hello World” |
| VarcharType | Ma ograniczenie długości. Zapisywanie danych nie powiedzie się, jeśli wprowadzony ciąg znaków przekroczy limit długości. |
| BinaryType | Reprezentuje wartości sekwencji bajtów. |
| BooleanType | Reprezentuje wartości logiczne True False. |
| TimestampType | Reprezentuje wartości składające się z pól rok, miesiąc, dzień, godzina, minuta i sekunda, z lokalną strefą czasową sesji. Wartość znacznika czasu reprezentuje bezwzględny punkt w czasie. 2021-09-01 12:34:56 |
| DateType | Reprezentuje wartości zawierające wartości pól rok, miesiąc i dzień, bez strefy czasowej. 2021-09-01 |
| ArrayType | Reprezentuje wartości składające się na sekwencję elementów typu elementType. containsNullsłuży do wskazania, czy elementy w ArrayTypewartości mogą mieć nullwartości. [1, 2, 3, 4, 5] |
| MapType | Reprezentuje wartości składające się z zestawu par klucz-wartość. {„key1”: „value1”, „key2”: „value2”} |
| StructType | Reprezentuje wartości o strukturze opisanej przez sekwencję StructFields (fields). {„name”: „John”, „age”: 30, „city”: „New York”} |
Jakie mogą być konsekwencję złego doboru typu danych. W najprostszym przykładzie wpłynie to na osiągi oraz koszty magazynu danych. Poniżej tabelka porównująca ile danych zajmie 100 000 000 rzędów danych.
| Liczba rzędów | Typ | Rozmiar |
| 100 000 000 | Integer | 190.73 GB |
| 100 000 000 | Long | 381.47 GB |
Jak widzisz różnica jest znaczna, a to tylko jedna kolumna, jeśli źle dobierzesz typ danych możesz drogo za to zapłacić.
Kiedy walidować dane
Najczęściej możesz zastosować dwa modele walidacji on Read i on Write.
On Read
Jeśli zaczynasz przetwarzać dane i wiesz, że nie są najlepszej jakości, to niewątpliwie powinieneś je walidować na samym początku. Później, kiedy już zaczniesz przetwarzać, może być za późno na wyłapanie błędów. Poniżej przykład walidacji schematu pliku csv. Produkcyjnie pliki schematu trzymamy w repo i ładujemy np. przy pomocy narzędzi DevOps.
from pyspark.sql.types import StructType, StringType, IntegerType, StructField
from pyspark.sql.functions import *
filePath = "dbfs:/FileStore/tables/Files/names.csv"
schemat = (StructType([
StructField("id", StringType(), False),
StructField("name", StringType(), False),
StructField("birth name", StringType(), False)
]))
# CSV
namesDf = (spark.read.format("csv")
.option("header", "true")
.schema(schemat)
.load(filePath))
# JSON
names = (spark.read.format("json")
.option("multiline","true")
.option("schema", schemat)
.load("dbfs:/FileStore/tables/file.json")
.withColumnRenamed("birth name","birth_name")
.withColumn("IdNum",monotonically_increasing_id()))
Bezpieczne ładowanie danych
Spark daje Ci możliwość pobrania danych z wielu źródeł tutaj przykłady najpopularniejszych formatów.
- CSV
- JSON
- Parquet
- ORC
- Połączenia JDBC / ODBC
- Zwykłe pliki tekstowe
W pracy z danymi ważne jest, żeby złe dane nie przedostały się ze źródła do celu. Jeśli chodzi o Apache Spark, to jest kilka metod dostępnych oprócz walidacji schematów. Są to tzw Read Modes. Pozwalają na obsłużenie błędów podczas odczytu danych i stanowią dodatkową weryfikację.
Typy
PERMISSIVE: Jeśli atrybuty nie mogą zostać wczytane Spark zamienia je na nule
DROPMALFORMED: wiersze są usuwane
FAILFAST: proces odczytu zostaje całkowicie zatrzymany
Na produkcji powinniśmy przekierować dane do osobnej ścieżki, gdzie zbudujemy osobny mechanizm przeładowanie błędnych danych. Taka kwarantanna dla danych.
df = (spark.read.format('json')
.option('schema', schemat)
.option('badRecordsPath', '/mnt/source/badrecords')
.load('dbfs:/FileStore/tables/file.json'))
df = (spark.read.format("json")
.option('schema', schemat)
.option ('mode','PERMISSIVE')
.load('dbfs:/FileStore/tables/file.json'))
df = (spark.read.format('json')
.option ('schema' , schemat)
.option ('mode','DROPMALFORMED')
.load('dbfs:/FileStore/tables/file.json'))
df = (spark.read.format('json')
.option('schema', schemat)
.option('mode','FAILFAST')
.load('dbfs:/FileStore/tables/file.json'))
On Write
Drugim etapem walidacji może być sprawdzenie schematu przy zapisie. To gwarantuje, że twoje transformacje działają, jak należy. Jeśli już skończyłeś przetwarzać dane, to powinieneś sprawdzić czy spełniają twoje wstępne wymagania. I do tego możesz sprawdzić schemat podczas zapisu.
Kiedy użyć odpowiednej metody?
To zależy od tego, co robisz. Dane możesz walidować na samym początku i/lub na końcu. Optymalnie powinieneś to robić na obu etapach procesu. Są od tego wyjątki. Jeśli pobierasz dane z produkcji, to oznacza, że są one dobrej jakości, a więc nie koniecznie musisz je walidować przy odczycie. Możesz ograniczyć się do walidacji Dataframe przy zapisie. Wszystko zależy od tego jaki masz cel i jakie są twoje wymagania. Użyj odpowiedniej metody do problemu.
Tutaj oprócz walidacji możesz użyć ’save modes’
- append: dodawanie danych do istniejących (Uwaga może dojść do duplikatów).
- overwrite: Nadpisanie danych (Uwaga stracisz co było do tej pory).
- errorifexists: Jeśli dane istnieją w danej ścieżce to pojawi się wyjątek.
- ignore: Jeśli dane istnieją to nic się nie wydarzy.
Poniżej przykład z walidacją schematu avro podczas zapisu.
schema_str = '''
{
"type": "record",
"name": "avro_schema",
"namespace": "avro",
"fields": [
{
"name": "birth_name",
"type": "string",
"doc": "Nazwisko rodowe"
},
{
"name": "id",
"type": "string",
"doc": "Kolumna z numerem referencyjnym"
},
{
"name": "name",
"type": "string",
"doc": "Nazwisko"
},
{
"name": "IdNum",
"type": "long",
"doc": "jakaś kolumna id"
}
],
"doc:": "Prosty schemat do testów"
}
'''
(names.write
.format("avro")
.option("avroSchema",schema_str)
.mode("overwrite")
.save('dbfs:/FileStore/tables/dane'))
Walidacja danych w Delta Lake
Jeśli pracujesz w chmurze (Azure, AWS lub GCP) to masz dostęp do mechanizmu zapewniającego walidację schematu podczas zapisu, co oznacza, że wszystkie dane w tabeli są sprawdzane pod kątem zgodności ze schematem tabeli docelowej.
Delta Lake
Delta Lake to zoptymalizowana warstwa magazynu, która stanowi podstawę do przechowywania danych i tabel na platformie Databricks. Odrobina kodu open source, ale czyni cuda i bardzo pomaga w pracy. Jest w pełni spójna z API Sparka i działa z operacjami typu batch i stream.
Zalety Delta Lake:
- Transakcje ACID
- Skalowalne metadane – radzi sobie z dużymi ilościami danych PB
- Podróż w czasie – możesz odczytać historyczne wersje tabeli (rollback, audit, reproduce data)
- Open Source
- Działa z batch i stream
- Walidacje schematu danych – radzi sobie ze zmianami schematu
- Pełny audyt
- Działa z API Scala, Python, SQL
Optymistyczna kontrola współbieżności
Usługa Delta Lake używa optymistycznej kontroli współbieżności, która zapewnić gwarancje transakcyjne między zapisami. W ramach tego mechanizmu zapisy działają na trzech etapach:
Odczyt: Odczytuje (w razie potrzeby) najnowszą dostępną wersję tabeli, i określa, które pliki należy zmodyfikować.
Zapis: etapy wszystkich zmian przez zapisanie nowych plików danych.
Zweryfikuj i zatwierdź: przed zatwierdzeniem zmian sprawdza, czy proponowane zmiany powodują konflikt z wszelkimi innymi zmianami. Jeśli nie ma konfliktów, wszystkie przygotowane zmiany są zatwierdzane jako nowa migawka w danej wersji. W rezultacie operacja zapisu zakończy się pomyślnie. Jeśli jednak występują konflikty, operacja zapisu kończy się niepowodzeniem z wyjątkiem współbieżnej modyfikacji, a nie uszkodzeniem tabeli, tak jak w przypadku operacji zapisu w tabeli Parquet.
Databricks sprawdzi następujące elementy:
- Dataframe nie może zawierać więcej kolumn niż jest w tabeli docelowej.
- Typy danych w poszczególnych kolumnach muszą pasować do tabeli docelowej.
- Nazwy kolumn muszą pasować nawet wielkość liter jest sprawdzana.
Dzięki temu mechanizmowi masz dodatkową gwarancję, że typy danych będą pasować do istniejącego modelu. Wszelkie niezgodności wywołają wyjątek i Spark nie zapisze danych.
Nie martw się, Delta Lake jest tak wypasiona, że umożliwia aktualizację schematu tabeli. Obsługiwane są następujące typy zmian:
- Dodawanie nowych kolumn (w dowolnych pozycjach)
- Zmiana kolejności istniejących kolumn
- Zmiana nazw istniejących kolumn
(spark.read.table(...)
.withColumn("birthDate", col("birthDate").cast("date"))
.write
.mode("overwrite")
.option("overwriteSchema", "true")
.saveAsTable(...)
)
Przykładowy notatnik
Daj znać czy to Ci pomogło a może o czymś zapomniałem?
