• Przejdź do treści
  • Przejdź to drugiego menu
  • Przejdź do głównego paska bocznego
  • Przejdź do stopki
  • START
  • BLOG
  • NEWSLETTER
  • KIM JESTEM
  • KONTAKT
Cegładanych

Cegładanych

Dane - Databricks i Chmura Azura

  • Azure
  • Databricks
  • Spark
  • Etl
  • Engineering
  • AI

Co powinieneś wiedzieć o Spark Dataframe

10.11.2021 Krzysztof Nojman

Spark Dataframe

Dataframe czyli ramka danych

Ramka danych jest obiektem istniejącym w pamięci RAM. Najłatwiej ją zobrazować jako tabelę, która posiada kolumny oraz rzędy danych. Każda kolumna tak jak w bazie danych posiada nazwę oraz typ danych. Dataframe jest kolekcją obiektu Row (RDD[Row]) i schematu. Taka 'tabelka’ w pamięci ma bardzo dużo zalet dla analityka. Łatwo z nią pracować, a dodatkowo masz całą masę dostępnych funkcji do pracy na wierszach i kolumnach. Dataframe są niezmienne, każda ramka w pamięci nie może być zmieniona, jeśli chcesz dodać lub usunąć wartości to musisz stworzyć nową. Nazwy kolumn możesz wyczytać ze źródła bądź co jest bardziej poprawne stworzyć ze schematu.

Apache Spark Dataframe

Dataframe nie tylko wspiera podstawowe typy danych ale również skomplikowane. Oto przykłady: maps, structs, dates, timestamp, fields. Lista wszystkich typów danych w Apache Spark Dataframe. Wszystkie typy danych są dostępne w paczce import org.apache.spark.sql.types._ lub dla pythona from pyspark.sql.types import *

Data typeValue type in ScalaAPI to access or create a data type
ByteTypeByteByteType
ShortTypeShortShortType
IntegerTypeIntIntegerType
LongTypeLongLongType
FloatTypeFloatFloatType
DoubleTypeDoubleDoubleType
DecimalTypejava.math.BigDecimalDecimalType
StringTypeStringStringType
BinaryTypeArray[Byte]BinaryType
BooleanTypeBooleanBooleanType
TimestampTypejava.sql.TimestampTimestampType
DateTypejava.sql.DateDateType
ArrayTypescala.collection.SeqArrayType(elementType, [containsNull])
Note: The default value of containsNull is true.
MapTypescala.collection.MapMapType(keyType, valueType, [valueContainsNull])
Note: The default value of valueContainsNull is true.
StructTypeorg.apache.spark.sql.RowStructType(fields)
Note: fields is a Seq of StructFields. Also, two fields with the same name are not allowed.
StructFieldThe value type in Scala of the data type of this field(For example, Int for a StructField with the data type IntegerType)StructField(name, dataType, [nullable])
Note: The default value of nullable is true.

Jak stworzyć Dataframe

Jest kilka metod żeby stworzyć Dataframe w Apache Spark.

Najczęściej będziesz to robił wczytując plik z dysku. Bądź pobierając dane z bazy lub z tabel Hive. Możesz też użyć metody createDataFrame(), bądź stworzyć ją z obiektu Row.

  • JSON
  • parquet
  • JDBC
  • ORC
  • Tabele Hive lub bazę zgodną ze standardem JDBC

Schemat danych (schema)

Bardzo ważnym etapem przy przetwarzaniu danych jest walidacja pliku. Na tym etapie sprawdź czy nazwy kolumn i ich typy danych pasują do schematu. Schemat danych to nic innego jak lista kolumn jakie powinien mieć Dataframe oraz typ danych np. String, Integer, Array ect.

Jak stworzyć schemat danych

Są dwie najważniejsze metody do stworzenia schematu danych.

  1. Programatycznie przy użyciu skryptu
  2. Przy użyciu DDL (Data Definition Language)
  3. Wczytać schemat z pliku
%scala
import org.apache.spark.sql.types._
// 1. Tworzenie schematu z typu StructType
val schema = StructType(
    Array(StructField("Spark Version",StringType,true),
          StructField("Scala Version",StringType,true),
          StructField("Repository",StringType,true),
          StructField("Usages",IntegerType,true),
          StructField("Date",StringType,true),
          StructField("Updated",IntegerType,true)))
// 2. DDL
val schema2 = "`Date` STRING, `Repository` STRING, `Scala Version` STRING, `Spark Version` STRING, `Updated` INT, `Usages` INT"
// 3. Schemat można stworzyć z pliku np json 
val schema3 = spark.read.format("json").option("multiline","true").load("/FileStore/tables/dataframe.json").schema
// 4. Jeśli zajdzie potrzeba możesz pobrać schemat z dysku w formie json, czsami jest to przydatne
val schema4 = spark.read.format("json").option("multiline","true").load("/FileStore/tables/dataframe.json").schema.json
%python
from pyspark.sql.types import *
# 1. Tworzenie schematu z typu StructType
schema = StructType([StructField("Spark Version",StringType(),True),
                     StructField("Scala Version",StringType(),True),
                     StructField("Repository",StringType(),True),
                     StructField("Usages",IntegerType(),True),
                     StructField("Date",StringType(),True),
                     StructField("Updated",IntegerType(),True)])
# 2. DDL
schema2 = "`Date` STRING, `Repository` STRING, `Scala Version` STRING, `Spark Version` STRING, `Updated` INT, `Usages` INT"
# 3. Schemat można stworzyć z pliku np json 
schema3 = spark.read.format("json").option("multiline","true").load("/FileStore/tables/dataframe.json").schema
# 4. Jeśli zajdzie potrzeba możesz pobrać schemat do json, czsami jest to przydatne
schema4 = spark.read.format("json").option("multiline","true").load("/FileStore/tables/dataframe.json").schema.json

Tworzymy ramki danych z pliku

Najczęściej spotykana metoda to tworzenie ramki danych z pliku, w zależności od formatu (json, csv, avro, parquet, text, orc) możesz do tego użyć klasy DataFrameReader

%scala
val file = spark.read.format("json")
.option("multiline","true")
.schema(schema)
.load("/FileStore/tables/dataframe.json")
%python 
file = spark.read.format("json") \
.option("multiline","true") \
.schema(schema) \
.load("/FileStore/tables/dataframe.json")

Tworzymy Dataframe używając metod createDataFrame() i toDF()

%scala
val kolumny = Seq("Spark Version","Scala Version", "Repository", "Usages", "Date", "Updated")
val dane = Seq(
  ("3.2.0", "2.13", "central", 15, "Oct, 2021", 20211026),
  ("3.1.2", "2.12", "central", 77, "May, 2021", 20211026),
  ("3.1.1", "2.12", "central", 102, "Jan, 2021", 20211026))

val df = spark.createDataFrame(dane).toDF(kolumny:_*)
%python
kolumny = ["Spark Version","Scala Version", "Repository", "Usages", "Date", "Updated"]
dane = [("3.2.0", "2.13", "central", 15, "Oct, 2021", 20211026),
           ("3.1.2", "2.12", "central", 77, "May, 2021", 20211026),
           ("3.1.1", "2.12", "central", 102, "Jan, 2021", 20211026)]

df = spark.createDataFrame(dane).toDF(*kolumny)

Tworzenie ramki danych z tabeli SQL Server

%scala
val user = "sqladminuser..."
val password = "Password..."
val jdbc = "jdbc:sqlserver://acmeservercentral.database.windows.net:1433;database=db_sqlmain;slProtocol=TLSv1"
val driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
  
val listOfTables = spark.read
  .format("jdbc")
  .option("url", jdbc)
  .option("user", user)
  .option("password", password)
  .option("driver", driver)
  .option("query",s"SELECT * FROM information_schema.tables'")
  .load()
%python
user = "sqladminuser..."
password = "Password..."
jdbc = "jdbc:sqlserver://acmeservercentral.database.windows.net:1433;database=db_sqlmain;slProtocol=TLSv1"
driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
 
listOfTables = spark.read \
  .format("jdbc") \
  .option("url", jdbc) \
  .option("user", user) \
  .option("password", password) \
  .option("driver", driver) \
  .option("query",s"SELECT * FROM information_schema.tables'") \
  .load()

Tworzenie Pandas DataFrame przy użyciu Python

Pandas są obiektem występującym tylko w Pythonie, jeśli masz Pandas DataFrame to bardzo łatwo możesz go transformować do sparkowej ramki danych.

%python
import pandas as pd

dane = [["3.2.0", "2.13", "central", 15, "Oct, 2021", 20211026], 
        ["3.1.2", "2.12", "central", 77, "May, 2021", 20211026],
        ["3.1.1", "2.12", "central", 102, "Jan, 2021", 20211026]]

kolumny = ["Spark Version","Scala Version", "Repository", "Usages", "Date", "Updated"]

pandasDF = pd.DataFrame(dane, columns=kolumny)
sparkDf = spark.createDataFrame(pandasDF)
display(sparkDf)

Tworzenie ramki danych przy użyciu Case Classes w Scali

%scala
case class Spark(sparkVersion: String, scalaVersion: String, repository: String, usages: Long, date: String, updated: Long)

val dane = Seq(Spark("3.2.0", "2.13", "central", 15, "Oct, 2021", 20211026),
                 Spark("3.1.2", "2.12", "central", 77, "May, 2021", 20211026),
                 Spark("3.1.1", "2.12", "central", 102, "Jan, 2021", 20211026))
val df = spark.createDataFrame(dane)

Tworzenie ramki danych z obiektu Row

%scala
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._

val rows = Seq(
  Row("3.2.0", "2.13", "central", 15, "Oct, 2021", 20211026),
  Row("3.1.2", "2.12", "central", 77, "May, 2021", 20211026),
  Row("3.1.1", "2.12", "central", 102, "Jan, 2021", 20211026))

val auctions = spark.createDataFrame(spark.sparkContext.parallelize(rows), schema)
%python
from pyspark.sql import Row

rows = [Row("3.2.0", "2.13", "central", 15, "Oct, 2021", 20211026), 
        Row("3.1.2", "2.12", "central", 77, "May, 2021", 20211026),
        Row("3.1.1", "2.12", "central", 102, "Jan, 2021", 20211026)]

data = spark.createDataFrame(spark.sparkContext.parallelize(rows), schema)

Tutaj możesz pobrać notatnik Databricks DataFrame.dbc z przykładami.

Stworzenie obiektu Dataframe jest raczej łatwe, a kiedy jest w pamięci to już prosta droga do wykonania operacji analitycznych takich jak filtrowanie, liczenie, agregowanie, grupowanie ect. Dzięki takiej strukturze danych praca analityka jest znacznie łatwiejsza ponieważ przy mniejszej ilości kodu możesz osiągnąć więcej. W tym artykule znajdziesz trochę przydatnego kodu.

W kategorii:Spark Tagi:Apache Spark, Dataframe

Big Data ebook
Subskrybuj
Powiadom o
guest

guest

0 Komentarze
Najstarsze
Najnowsze Najwięcej głosów
Opinie w linii
Zobacz wszystkie komentarze

Pierwszy panel boczny

O MNIE

Narzędzia i dobre procesy do przetwarzania danych to podstawa sukcesu i wartości dla firmy. Czytaj więcej…

big data ebook

Ostatnie wpisy

Jak efektywnie korzystać z Databricks Assistant – 5 sprawdzonych praktyk

16.11.2025 By Krzysztof Nojman

Databricks DQX

Jakość danych w Databricks DQX

28.01.2025 By Krzysztof Nojman

spark joins

Jak Spark robi join?

13.01.2025 By Krzysztof Nojman

Linki społecznościowe

  • Facebook
  • GitHub
  • LinkedIn
  • YouTube

Wyszukiwanie

Footer

Najnowsze wpisy

  • Jak zainstalować Python whl na Serverless
  • Jak efektywnie korzystać z Databricks Assistant – 5 sprawdzonych praktyk
  • Jakość danych w Databricks DQX
  • Jak Spark robi join?
  • Czy JSON to samo zło
  • VS Code nowości AI 
  • Lista narzędzi AI dla każdego inżyniera, które warto znać

Tagi

AI Apache Spark Architektura Azure BIg Data Certyfikat cloud Databricks Data Factory Dataframe DQX ETL Hurtownia Danych Intellij IoT Jaka technologia Join Kod Konfiguracja lakehouse Narzędzia Optymalizacja pyspark Spark Windows 10 zadania

Informacje Prawne

To jest nudna część lecz wymagana, wszystkie notki prawne o stronie znajdziecie tutaj.

Polityka Prywatności

Regulamin

Copyright © 2026 · Wszelkie prawa zastrzeżone. Krzysztof Nojman

wpDiscuz