• Przejdź do treści
  • Przejdź to drugiego menu
  • Przejdź do głównego paska bocznego
  • Przejdź do stopki
  • START
  • BLOG
  • NEWSLETTER
  • KIM JESTEM
  • KONTAKT
Cegładanych

Cegładanych

Dane - Databricks i Chmura Azura

  • Azure
  • Databricks
  • Spark
  • Etl
  • Engineering
  • AI

Ten kod może Ci się przydać…

30.11.2024 Krzysztof Nojman

Notatnik Databricks

Jeśli pracujesz w środowisku Databricks, to najprawdopodobniej będziesz potrzebował zautomatyzować część funkcjonalności, żeby ułatwić sobie życie. Nie mówię tutaj tylko o produkcji, ale o pracy na devie. Przygotowałem trochę kodu, z którego często korzystam. Są to elementy przydatne może nie na co dzień, ale od Świeta 🙂 więc warto o nich pamiętać. Może i tobie się przyda.

Dbutils

Jest to funkcjonalność dostarczona przez Databricks i ułatwia prace w notebookach. Pozwala na użycie wielu funkcji. Podstawowe ich opcje to:

  • Praca z obiektami na koncie magazynu, czyli blob lub data lake.
  • Łączenie wielu notebooków i przekazywanie parametrów pomiędzy nimi.
  • Praca z sekretami secrets stosuje się je do przechowywania kluczy lub haseł.

Szczegóły znajdziesz w dokumentacji.

Przeglądanie zawartości konta magazynu

Jeśli chcesz wiedzieć, jakie masz dostępne pliki, to możesz wyświetlić pełną zawartość folderów. Działa w sytuacji, kiedy użyłeś funkcji mount i połączyłeś workspace z kontem magazynu (Blob Storage lub Data Lake). Przykład mount znajdziesz w kolejnym akapicie. Fs.ls wyświetli wszystkie pliki w danej ścieżce.

display(dbutils.fs.ls("/mnt/source/"))

Przeglądanie połączonych kont magazynu

Dzięki tej funkcji możesz zobaczyć wszystkie połączone dyski (kontenery lub konta magazynu). Szczegóły dotyczące mount najdziesz w akapicie poniżej.

display(dbutils.fs.mounts)

Uruchamiani innych notatników

Jeśli chcesz stworzyć 'workflow’ czyli połączyć kilka notatników w jeden proces to masz możliwość ich uruchomiania z poziomu komórki ’cmd’. Na temat tworzenia workflows zrobię osobny wpis. Do tego potrzebujesz dbutils.notebook.run Pierwszy parametr to nazwa notatnika, drugi to timeout a trzeci to lista parametrów (w formacie klucz, wartość), jakie możesz przekazać do uruchamianego notatnika.

dbutils.notebook.run("nazwa-notatnika", 60, {"argument": "value1", "argument2": "value2", ...})

Łączenia notatników

Databricks daje Ci możliwość uruchomienia dodatkowego notatnika, który ma dostęp do tych samych zmiennych co notatnik wywołujący. Działa to odmiennie niż w przypadku dbutils.notebook.run. gdzie musisz przekazać zmienną w postaci parametru, niestety bezpośredni odczyt wartości zmiennych nie jest możliwy. Tutaj z pomocą przychodzi %run, który uruchomi drugi notatnik, ale scope zmiennych jest 'globalny’ – oba notatniki widzą swoje zmienne.

%run "./Notatnik 2"

Ustawianie parametrów notatnika i ich pobieranie

Ta funkcja jest bardzo przydatna, kiedy chcesz przekazać zmienną do notatnika. Najczęściej korzystam z tego, kiedy używam Data Factory do uruchomienia notatnika w Databricks. W samym notatniku użyję widgets żeby odczytać wartość przekazywanego parametru i przypisać do zmiennej.

dbutils.widgets.text("argument","wartość","nazwa")
val przykład = dbutils.widgets.get("argument")

Mount Drive

Jeśli chcesz korzystać z danych zapisanych w magazynie danych tj. Blob storage lub Data Lake, to masz możliwość połączenia się bezpośrednio z Databricks workspace. Poniżej jest przykładowy kod, który stworzy mountPoint. Jeśli chcesz go wykorzystać, to musisz podać dane ze swojej grupy zasobów. Możesz połączyć się do całego konta magazynu i mieć dostęp do zawartości wszystkich kontenerów lub przypisać mountPoint tylko do jednego kontenera.

Ważne !!!
Nie powinno się udostępniać klucza SAS i do tego ścieżki do konta magazynu, wtedy każdy może się do niego dostać. Produkcyjnie powinniśmy ukryć wszystkie poufne informacje w key vault i użyć Databricks Secrets.

Poniżej przykład jak użyć mount z kluczem SAS, można też użyć klucza konta magazynu, ale to jest znacznie mniej bezpieczne. SAS ma datę ważności, po której wygaśnie jego ważność.

//mount disk
val storageAccount = "nazwakontamagazynu"
val container = "source"
val sasKey = "?sv=2020-08-04&ss=..........."

val mountPoint = s"/mnt/source"

val sourceString = s"wasbs://$container@$storageAccount.blob.core.windows.net/"
val confKey = s"fs.azure.sas.$container.$storageAccount.blob.core.windows.net"

// Sprawdzamy czy mount instnieje
try {
  if (!dbutils.fs.mounts.map(mnt => mnt.mountPoint).contains(mountPoint))
    dbutils.fs.mount(
     source = sourceString,
     mountPoint = mountPoint,
     extraConfigs = Map(confKey -> sasKey))
}
catch {
  case e: Exception =>
    println(s"ERROR: Cannot mount $mountPoint.")
}

JDBC i SQL Server

Poniżej znajdziesz przykład jak połączyć się z bazą w Azure i wykonać zapytanie TSQL. Zwrócone dane będą w Spark DataFrame i gotowe do użycia.

Ważne !!!

Jak w powyższym przykładzie powinienem użyć Databricks secrets, żeby trzymać nazwę użytkownika i hasło, ale nie jest to produkcja, więc mogę poszaleć 🙂

val user = "sqladminuser..."
val password = "Password..."
val jdbc = "jdbc:sqlserver://acmeservercentral.database.windows.net:1433;database=db_sqlmain;slProtocol=TLSv1"
val driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"


val listOfTables = spark.read
  .format("jdbc")
  .option("url", jdbc)
  .option("user", user)
  .option("password", password)
  .option("driver", driver)
  .option("query",s"SELECT * FROM information_schema.tables'")
  .load()

Następny przykład zapisze Dataframe do bazy

listOfTables.write
  .format("jdbc")
  .option("url", jdbc)
  .option("dbtable","dbo.stg_tablename")
  .option("user", user)
  .option("password", password)
  .option("driver", driver)
  .option("batchssize",2000)
  .option("numPartitions",10)
  .mode("append")
  .save()

SQL-Spark-Connector dla SQL Server i Azure SQL

Niedawno pojawił się wysokowydajny łącznik, który umożliwia korzystanie z danych transakcyjnych w analizie Big Data i utrwala wyniki dla zapytań ad hoc lub raportowania. Łącznik umożliwia używanie dowolnej bazy danych SQL, lokalnej lub w chmurze, jako źródła danych wejściowych lub ujścia danych wyjściowych dla zadań Spark. Jest kompatybilny ze Spark 3.0. Od Lipcja jest w GA. Szczegóły na Github.

Są dwie wersje dostępne w Maven dla Scali 2.11 i 2.12

ConnectorMaven CoordinateScala Version
Spark 2.4.x compatible connnectorcom.microsoft.azure:spark-mssql-connector:1.0.22.11
Spark 3.0.x compatible connnectorcom.microsoft.azure:spark-mssql-connector_2.12:1.1.02.12

Poniżej przykładowy kod odczytu i zapisu, więcej na Github.

val server_name = "jdbc:sqlserver://acmeservercentral.database.windows.net"
val database_name = "db_sqlmain"
val url = server_name + ";" + "databaseName=" + database_name + ";"

val table_name = "Persons"
val username = "admin...."
val password = "Password.....!" 

//Odczyt
val jdbcDF = spark.read 
        .format("com.microsoft.sqlserver.jdbc.spark") 
        .option("url", url) 
        .option("dbtable", table_name) 
        .option("user", username) 
        .option("password", password)
        .load()

//Zapis

   dff.write 
     .format("com.microsoft.sqlserver.jdbc.spark") 
     .mode("append") 
     .option("url", url) 
     .option("dbtable", table_name) 
     .option("user", username) 
     .option("password", password) 
     .save()

Prosta metoda na schemat

Schemat danych jest dość ważny i dobrze mieć metodę, żeby go stworzyć i potem użyć podczas wczytywania danych. Jedna z prostych metod przedstawiam poniżej, oczywiście jest dość surowa i czasami trzeba schemat dostosować, ale zawsze ułatwia pracę.

W przypadku plików json czy csv ten kod wygeneruje schemat danych, który potem możesz użyć.

val file = spark.read.format("json")
.load("/mnt/source/shows.json")
file.schema.json
{"type":"struct","fields":[{"name":"_c0","type":"string","nullable":true,"metadata":{}}]}

Niby nic skomplikowanego ale zawsze pomoże.

Schemat możesz wczytać w pliku i załadować go jako `StructType`

import java.io.File
import java.nio.file.{Paths,Files}
import java.nio.charset.StandardCharsets
import org.apache.spark.sql.types._

val schemaFileName = Paths.get("/dbfs/mnt/source/shows-schema.json")
val schemaContent = new String(Files.readAllBytes(schemaFileName ), StandardCharsets.UTF_8)
val schema = DataType.fromJson(schemaContent).asInstanceOf[StructType]

Bezpieczne ładowanie danych

W pracy z danymi ważne jest, żeby złe dane nie przedostały się ze źródła do celu. I tutaj Spark daje Ci kilka opcji. Są to tzw 'Read Modes’, czyli opcje jakie masz podczas odczytu danych.

Typy

  • PERMISSIVE: Jeśli atrybuty nie mogą zostać wczytane Spark zamienia je na nule
  • DROPMALFORMED: wiersze są usuwane
  • FAILFAST: proces odczytu zostaje całkowicie zatrzymany

// Opcja 1 dane odrzucone podczas walidacji schematu zostaną zapisane w osobnym folderze
val filewithschema = spark.read.format("json")
.option("schema",schema)
.option("badRecordsPath", "/mnt/source/badrecords")
.load("/mnt/source/shows.json")

//Opcja 2
val filewithschema = spark.read.format("json")
.option("schema",schema)
.option("mode", "PERMISSIVE")
.load("/mnt/source/shows.json")

//Opcja 3
val filewithschema = spark.read.format("json")
.option("schema",schema)
.option("mode", "DROPMALFORMED")
.load("/mnt/source/shows.json")

// Opcja 4
val filewithschema = spark.read.format("json")
.option("schema",schema)
.option("mode", "FAILFAST")
.load("/mnt/source/shows.json")

Sprawdzanie Nulls

Nulle są ważne i dobrze jest widzieć ile ich jest. Dzięki sprytowi Sparka można łatwo sprawdzić, czy Dataframe ma nulle i ile ich jest w poszczególnych kolumnach. Poniżej przykład

import org.apache.spark.sql.functions.{col, when, count}
val countNulls = df.select(file.columns.map(c => {count(when(col(c).isNull,c)).alias(c)}): _*)
display(countNulls)

Daj znać czy jest to przydatne i czy masz coś ciekawego na swojej liście?

W kategorii:Databricks Tagi:Databricks, Kod

Big Data ebook
Subskrybuj
Powiadom o
guest

guest

0 Komentarze
Najstarsze
Najnowsze Najwięcej głosów
Opinie w linii
Zobacz wszystkie komentarze

Pierwszy panel boczny

O MNIE

Narzędzia i dobre procesy do przetwarzania danych to podstawa sukcesu i wartości dla firmy. Czytaj więcej…

big data ebook

Ostatnie wpisy

spark joins

Jak Spark robi join?

13.01.2025 By Krzysztof Nojman

Czy JSON to samo zło

04.01.2025 By Krzysztof Nojman

VS Code nowości AI 

09.12.2024 By Krzysztof Nojman

Linki społecznościowe

  • Facebook
  • GitHub
  • LinkedIn
  • YouTube

Wyszukiwanie

Footer

Najnowsze wpisy

  • Jakość danych w Databricks DQX
  • Jak Spark robi join?
  • Czy JSON to samo zło
  • VS Code nowości AI 
  • Lista narzędzi AI dla każdego inżyniera, które warto znać
  • Kilka pomysłów na konfigurację Databricks
  • Co pamięta wykonawca (executor🧠)

Tagi

AI Apache Spark Architektura Azure BIg Data Certyfikat cloud Databricks Data Factory Dataframe DQX ETL Hurtownia Danych Intellij IoT Jaka technologia Join Kod Konfiguracja lakehouse Narzędzia Optymalizacja pyspark Spark Windows 10 zadania

Informacje Prawne

To jest nudna część lecz wymagana, wszystkie notki prawne o stronie znajdziecie tutaj.

Polityka Prywatności

Regulamin

Copyright © 2025 · Wszelkie prawa zastrzeżone. Krzysztof Nojman

wpDiscuz