• Przejdź do treści
  • Przejdź to drugiego menu
  • Przejdź do głównego paska bocznego
  • Przejdź do stopki
  • START
  • BLOG
  • NEWSLETTER
  • KIM JESTEM
  • KONTAKT
Cegładanych

Cegładanych

Dane - Databricks i Chmura Azura

  • Azure
  • Databricks
  • Spark
  • Etl
  • Engineering
  • AI

Czy można wykonać notatniki Databricks równolegle

23.11.2024 Krzysztof Nojman

notatniki databricks

Jak wiesz Spark jest stworzony do przetwarzania równoległego. Nie o tym jednak chcę dzisiaj napisać, ale o równoległym wykonaniu notatników Databricks. Stanąłem oko w oko przed problemem orkiestracji wielu kolekcji danych. I pojawił się problem ich ilości, w tradycyjnym podejściu musiałbym uruchomić kilkadziesiąt notatników jeden po drugim. Co nie jest zbyt efektywne.

Mam kilkanaście kolekcji do przeładownia, a każda z kolekcji ma po kilka do kilkunastu elementów. Wiec nie chciałem tego przetwarzać szeregowo, bo klaster będzie się męczył, a ja muszę na to patrzeć i czekać, a tego nie lubię.

Wątki

Wyobraź sobie, że musisz przetwarzać dane dla kilkunastu miast w Polsce. Ważne jest, aby traktować każde miasto jako całość. Będziesz przetwarzać całą listę kolekcji mającą powiązania hierarchicznie (rodzic, dziecko). Chciałbyś to robić jak najszybciej i nie przetwarzać jednego miasta po drugim, lecz wszystkie naraz. Jeśli jest problem z przetwarzaniem w jednym z zadań na poziomie dzielnic, to cała kolekcja powinna zostać wstrzymana. Pojawia się pytanie jak uruchomić notatniki równolegle, czy jest mechanizm, który to ogarnie. Na szczęście jest 🙂 tutaj podpowiedź w dokumentacji. Trzeba użyć wielu wątków, czyli Threads lub Futures. Jest to dostępne w obu najważniejszych językach Python i Scala.

Całość przykładowego rozwiązania to dwa notatniki i pliki konfiguracyjne. do pobrania na GitHub.

Planowanie zadań

Na poziomie Sparka zadania mogą zostać zgrupowane („pools”) i każda grupa może dostać inną porcję zasobów. Tzn. że dla poszczególnej grupy możesz przypisać więcej zasobów. Ta metoda pozwala potraktować różne grupy z odmiennym priorytetem. W obrębie grupy zadania będą miały dostęp to tych samych zasobów i będą potraktowane jako równe.

Sprawdź ustawienia Sparka czy Fair Scheduler Pool jest ustawiony: spark.conf.get("spark.scheduler.mode") powinieneś otrzymać wartość 'FAIR’ jeśli nie to ją ustaw spark.conf.set("spark.scheduler.mode", "FAIR")

Nowe zadania są przekazywane do domyślnej puli. Pule zadań można ustawić, dodając „spark.scheduler.pool” do SparkContext w wątku, który je przesyła.

sc.setLocalProperty("spark.scheduler.pool", "pool1")

Ważne

Notatniki wykonane w ten sposób będą dzieliły zasoby klastra. W razie braku zasobów lub błędów w jednym notatniku może dojść do awarii wszystkich. Pamiętaj o tym. Ważne jest, abyś pamiętaj o kontekście, w jakim stosujesz rozwiązanie. To rozwiązanie może być efektywne w pewnych sytuacjach, ale nie w każdej. Jeśli okaże się, że rozwiązanie jest niestabilne można część np Miasta wykonać w ADF lub użyć Jobs API, a dzielnice wykonać równolegle w notatnikach.

Przykładowy plik konfiguracyjny

{
	"wersja": "1.0.0",
	"miasto": "Kraków",
	"dzielnice": [
		{
			"wejscie": "/mnt/miasta/krakow/Kazimierz",
			"wyjscie": "/mnt/miasta/przetworzone/krakow/Kazimierz",
			"dzielnica" : "Kazimierz"
			
		},
		{
			"wejscie": "/mnt/miasta/krakow/Łagiewniki",
			"wyjscie": "/mnt/miasta/przetworzone/krakow/Łagiewniki",
			"dzielnica" : "Łagiewniki"
			
		},
		{
			"wejscie": "/mnt/miasta/krakow/Dębniki",
			"wyjscie": "/mnt/miasta/przetworzone/krakow/Dębniki",
			"dzielnica" : "Dębniki"
		}
	
	]

}

Notatniki Databricks

Oto najważniejsza część kodu odpowiedzialnego za wywołanie notatników. Tutaj użyje dbutils.notebook.run(). Każde wykonanie notatnika to osobne zadanie (Jobs). Poniżej kod w scali odpowiedzialny za wykonanie zadań i mechanizm wielowątkowy.

//Uruchomi notatkini równolegle
import java.util.concurrent.Executors
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration.Duration

val pliki = dbutils.fs.ls("/Tmp/DataStore/dane")
val listsPlikow = pliki.map(_.path.replace("dbfs:","/dbfs"))
val argumenty = listsPlikow.map(x => Map("plikKonfiguracyjny" -> x))

val kontekst = dbutils.notebook.getContext()
val liczbaZadan = argumenty.size

implicit val executionContext = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(liczbaZadan))

try {
  val futures = argumenty.zipWithIndex.map {case (args, i) => 
  Future({
    dbutils.notebook.setContext(kontekst)
    
    sc.setLocalProperty("spark.scheduler.pool",s"pool${i % liczbaZadan}")
    println(args)
    dbutils.notebook.run("Zadania",timeoutSeconds = 0, args)
  })}
  
  Await.result(Future.sequence(futures),atMost = Duration.Inf)
} catch {
  case e: Exception => {
    
  }
} finally {
  executionContext.shutdownNow()
}

Notatnik Workflow

Python

Poniżej kod w Pythonie, zasada działania jest podobna jak w scali.

%python
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

sciezka = "/Tmp/DataStore/dane"
notatnik = "/Zadania/"

def wykonajNotatnik(liczbaZadan, plik):
  dbutils.notebook.run(path = notatnik,
    timeout_seconds = 300, 
    arguments = {"plikKonfiguracyjny": plik})

pliki = dbutils.fs.ls(sciezka)

# liczbaZadan = pliki
nazwaPliku = [(p.path).replace("dbfs:","/dbfs") for p in pliki ]
with ThreadPoolExecutor() as executor:
  results = executor.map(wykonajNotatnik, nazwaPliku, nazwaPliku)

Drugi notatnik to przykład odczytu konfiguracji i pobrania parametrów.

dbutils.widgets.text("plikKonfiguracyjny","")
val plikKonfiguracyjny = dbutils.widgets.get("plikKonfiguracyjny")

import scala.io.Source
import org.json4s._
import org.json4s.jackson.JsonMethods._

val konfiguracja = Source.fromFile(plikKonfiguracyjny).mkString
val parametry = parse(konfiguracja)

val dzieliceKonfiguracja = for {JArray(dzielnice) <- parametry} yield dzielnice.size
val liczbaDzielic = dzieliceKonfiguracja(0)
val wersja = (parametry \\ "wersja").values.asInstanceOf[String]
val nazwa = (parametry \\ "miasto").values.asInstanceOf[String]

//Tutaj możesz wykorzystać ten sam mechanizm i przetwarzać dane równolegle zgodnie z tym co jest w konfiguracji lub sekwencyjnie
val rng = (1 to liczbaDzielic)
rng.zipWithIndex.map {
    case (klucz,index) => 
  
    val dzielice = (parametry \\ "dzielnice")(index).values.asInstanceOf[Map[String,String]]
    val wejscie = dzielice("wejscie")
    val wyjscie = dzielice("wyjscie")
    val dzielnica = dzielice("dzielnica")

    println(s"${wejscie},${wyjscie},${dzielnica}")
}

Notatnik Zadania

Daj znać czy używałeś czegoś podobnego i gdzie pojawiły się problemy ?

W kategorii:Databricks Tagi:zadania

Big Data ebook
Subskrybuj
Powiadom o
guest

guest

5 Komentarze
Najstarsze
Najnowsze Najwięcej głosów
Opinie w linii
Zobacz wszystkie komentarze
karol
karol
3 lata temu

Szkoda ze nie wszystkie skrypty w python 🙁
Jest może jakiś konwenter scala to python ?

Zauważyłem ze odpalając cos w ThreadPoolExecutor() on nie zrzuca błędów nawet skladni

0
Odpowiedz
Krzysztof Nojman
Krzysztof Nojman
Autor
Odpowiedź do  karol
3 lata temu

Cześć Karol
Też chciałbym mieć taki konwerter, jak masz czas to może coś stworzysz 🙂
Jak miałem notatnik to musiałem wszystko ręcznie przepisywać.

0
Odpowiedz
karol
karol
Odpowiedź do  Krzysztof Nojman
3 lata temu

a jak obsłużyć błędy w ThreadPoolExecutor ?

0
Odpowiedz
Dominik
Dominik
2 lata temu

Cześć Krzysztof. Pisząc notatniki z użyciem składni delta live tables można utworzyć joba w którym notatniki to poszczególne taski uruchamiane równolegle 🙂

0
Odpowiedz
Krzysztof Nojman
Krzysztof Nojman
Autor
Odpowiedź do  Dominik
2 lata temu

Masz rację workflowy w Databricks są coraz mocniejsze i można robić bardzo ciekawą orkiestrację. To jest temat na osobny artykuł. Masz z tym jakieś problemy czy wszystko działa na medal?

0
Odpowiedz

Pierwszy panel boczny

O MNIE

Narzędzia i dobre procesy do przetwarzania danych to podstawa sukcesu i wartości dla firmy. Czytaj więcej…

big data ebook

Ostatnie wpisy

spark joins

Jak Spark robi join?

13.01.2025 By Krzysztof Nojman

Czy JSON to samo zło

04.01.2025 By Krzysztof Nojman

VS Code nowości AI 

09.12.2024 By Krzysztof Nojman

Linki społecznościowe

  • Facebook
  • GitHub
  • LinkedIn
  • YouTube

Wyszukiwanie

Footer

Najnowsze wpisy

  • Jakość danych w Databricks DQX
  • Jak Spark robi join?
  • Czy JSON to samo zło
  • VS Code nowości AI 
  • Lista narzędzi AI dla każdego inżyniera, które warto znać
  • Kilka pomysłów na konfigurację Databricks
  • Co pamięta wykonawca (executor🧠)

Tagi

AI Apache Spark Architektura Azure BIg Data Certyfikat cloud Databricks Data Factory Dataframe DQX ETL Hurtownia Danych Intellij IoT Jaka technologia Join Kod Konfiguracja lakehouse Narzędzia Optymalizacja pyspark Spark Windows 10 zadania

Informacje Prawne

To jest nudna część lecz wymagana, wszystkie notki prawne o stronie znajdziecie tutaj.

Polityka Prywatności

Regulamin

Copyright © 2025 · Wszelkie prawa zastrzeżone. Krzysztof Nojman

wpDiscuz