• Przejdź do treści
  • Przejdź to drugiego menu
  • Przejdź do głównego paska bocznego
  • Przejdź do stopki
  • START
  • BLOG
  • NEWSLETTER
  • KIM JESTEM
  • KONTAKT
Cegładanych

Cegładanych

Dane - Databricks i Chmura Azura

  • Azure
  • Databricks
  • Spark
  • Etl
  • Engineering
  • AI

Apache Spark operacje na kolumnach

23.11.2024 Krzysztof Nojman

Kolumny Apache Spark

Kolumny

Kolumny w Spark Dataframe maja taką samą charakterystykę, jak w przypadku Pandas czy R DataFrames, na pewno znasz je z excela, bądź bazy relacyjnej. Koncepcja jest taka sama. Możesz dokonywać różnych operacji na wybranych lub wszystkich kolumnach. Operacje te będą zależeć od typu danych kolumny.

W Sparku możesz odnieść się do kolumny na kilka sposobów w zależności od języka z jakim pracujesz. Dobrą praktyką jest zaimportowanie odpowiednich bibliotek, najlepiej zrobić to w pierwszych blokach notatnika. Tak jak się to robi w IDE programując w różnych językach.

from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, MapType

Przykładowe dane

Nie martw się jeśli nie masz danych, mam dobre wieści, w każdym środowisku roboczym Databricks (Community edition, Azure i AWS) są dostępne dane do nauki Znajdują się w ścieżce dbfs:/databricks-datasets/

Są tam przeróżne dane przydatne w wielu modułach treningowych.

Akcje vs Transformacje

Tutaj lekkie przypomnienie o bardzo ważnej funkcjonalności Sparka. Spark jest leniwy, i tego możemy się od niego nauczyć 😁. Nie robić nic od razu tylko przemyśleć, jak coś zrobić.

Dopóki nie wywołasz akcji Spark nie wykona żadnych transformacji, bo po co, skoro nie każesz niczego pokazać lub zapisać na dysku to on nic nie musi robić. Taki cwaniak. Jest to bardzo rozsądne i znacznie pomaga zoptymalizować operację. Ma to swoje zalety:

  • Optymalizacja: Leniwa ocena pozwala Sparkowi zbudować logiczny plan wykonania, zwany DAG (Directed Acycle Graph)
  • Zmniejszone wykonanie IO oraz użycia pamięci: Leniwa ocena umożliwia platformie Spark potokowanie (pipelining) operacji i wykonywanie ich w jednej operacji.

Przykładowe operacje na kolumnach

MetodaOpis
alias(*alias, **kwargs)Zwraca kolumnę z nową nazwą lub nazw (w przypadku wyrażeń zwracających więcej niż jedną kolumnę, takich jak explode).
asc()Zwraca kolumnę posortowaną rosnąco
asc_nulls_first()Zwraca kolumnę posortowaną rosnąco. W pierszej kolejności będą nulle
asc_nulls_last()Zwraca kolumnę posortowaną rosnąco. W pierszej kolejności będą wartości a null na końcu
astype(dataType)Jest to alias wyrażenia cast()
between(lowerBound, upperBoundZwraca wartość boolean True, jeżeli bieżąca kolumna mieści się między dolnym a górnym zakresem, włącznie.
bitwiseAND(other)Oblicza operację logiczną 'AND’ tego z innym wyrażeniem.
bitwiseOR(other)Oblicza operację logiczną 'OR’ tego z innym wyrażeniem.
bitwiseXOR(other)Oblicza operację logiczną 'XOR’ tego z innym wyrażeniem. Wyrażenie XOR (ang. exclusive or) to operator logiczny, który zwraca prawdę (True) tylko wtedy, gdy jedno z dwóch porównywanych wyrażeń jest prawdziwe (ma wartość True), a drugie jest fałszywe (ma wartość False). Gdy oba wyrażenia mają tę samą wartość logiczną (obie są prawdziwe lub obie są fałszywe), wyrażenie XOR zwraca fałsz (False).
cast(dataType)Zmiena typ danych kolumny
contains(other)Sprawdza czy kolumna zawiera element z listy
desc()Zwraca kolumnę posortowaną malejąco
desc_nulls_first()Zwraca kolumnę posortowaną malejąco. W pierszej kolejności będą nulle
desc_nulls_last()Zwraca kolumnę posortowaną malejąco. W pierszej kolejności będą wartości a null na końcu
dropFields(*fieldNames)Wyrażenie, które usuwa pola na podstawie nazwy
endswith(other)Zwraca wartość boolean gdy String kończy się wybraną wartością
eqNullSafe(other)Test równości bezpieczny dla wartości null
getField(name)Wyrażenie, które pobiera pole po nazwie w typie danych StructType.
getItem(key)Wyrażenie, które pobiera element z danej pozycji listy lub pobiera element według klucza z słownika
ilike(Inny)Wyrażenie SQL ILIKE (wielkość liter nie ma znaczenia).
isNotNull()Zwraca wartośc boolean True, jeśli bieżące wyrażenie NIE ma wartości null.
isNull()Zwraca wartośc boolean True, jeśli bieżące wyrażenie ma wartość null.
isin(*cols)Wyrażenie logiczne, które ma wartość true, jeśli wartość tego wyrażenia jest zawarta w obliczonych wartościach argumentów.
like(Other)Wyrażenie podobne do SQL.
name(*alias, **kwargs)Jest pseudonimem dla alias().
otherwise(wartość)Ocenia listę warunków i zwraca jedno z wielu możliwych wyrażeń wynikowych.
over(window)Zdefiniuj kolumnę okienkową.
rlike(Other)Wyrażenie SQL RLIKE (LIKE z wyrażeniem regularnym).
startswith(Other)Ciąg zaczyna się od.
substr(początekPoz, długość)Zwróć a Column, które jest podciągiem kolumny.
when(stan, wartość)Ocenia listę warunków i zwraca jedno z wielu możliwych wyrażeń wynikowych.
withField(nazwa pola, kolumna)Wyrażenie, które dodaje/zastępuje pole według StructTypenazwy.

Dataframe

Żeby stworzyć Dataframe to najprostszą operacją jest wczytanie danych z jakiegoś źródła np. z pliku bądź bazy. Najczęstszą klasą używaną w każdym projekcie jest DataFrameReader, który służy do tworzenia ramki danych.

Przykładowy odczyt pliku csv.

filePath = "dbfs:/databricks-datasets/retail-org/customers/customers.csv"
namesDf = (spark.read.format("csv")
.option("header","true")
.option("inferSchema","true")
.load(filePath))

alias(*alias, **kwargs)

Zwraca kolumnę z nową nazwą lub nazw (w przypadku wyrażeń zwracających więcej niż jedną kolumnę, takich jak explode).

customersDf.select(col("street").alias("Ulica")).display()

asc()

Zwraca kolumnę posortowaną rosnąco

customersDf.orderBy(col("customer_id").asc()).display()  

asc_nulls_first()

Zwraca kolumnę posortowaną rosnąco. W pierszej kolejności będą nulle

customersDf.orderBy(col("tax_id").asc_nulls_first()).display()

asc_nulls_last()

Zwraca kolumnę posortowaną rosnąco. W pierszej kolejności będą wartości a null na końcu

customersDf.orderBy(col("tax_id").asc_nulls_last()).display()

astype(dataType)

Jest to alias wyrażenia cast()

customersDf.select(col("customer_id").astype("double")).display()

between(lowerBound, upperBound)

Zwraca wartość boolean True, jeżeli bieżąca kolumna mieści się między dolnym a górnym zakresem, włącznie.

#Zwracam uwagę na nazwę kolumny, tutaj powinieneś zrobić cast żeby to ładnie wyglądało

<code>customersDf.select(col("number").between(1,400)).display()</code>

bitwiseAND()

Oblicza operację logiczną 'AND’ tego z innym wyrażeniem.

data = [(1, 1), (2, 2), (3, 4),(-1,-3)]

df = spark.createDataFrame(data, ['value1', 'value2'])
new = (df.select("value1","value2")
      .withColumn('result', col('value1').bitwiseAND(col('value2')))).display()

bitwiseOR()

Oblicza operację logiczną 'OR’ tego z innym wyrażeniem.

data = [(1, 1), (2, 2), (3, 4),(-1,-3)]

df = spark.createDataFrame(data, ['value1', 'value2'])
new = (df.select("value1","value2")
      .withColumn('result', col('value1').bitwiseOR(col('value2')))).display()

bitwiseXOR()

Oblicza operację logiczną 'XOR’ tego z innym wyrażeniem. Wyrażenie XOR (ang. exclusive or) to operator logiczny, który zwraca prawdę (True) tylko wtedy, gdy jedno z dwóch porównywanych wyrażeń jest prawdziwe (ma wartość True), a drugie jest fałszywe (ma wartość False). Gdy oba wyrażenia mają tę samą wartość logiczną (obie są prawdziwe lub obie są fałszywe), wyrażenie XOR zwraca fałsz (False).

data = [(1, 1), (2, 2), (3, 4),(-1,-3)]

df = spark.createDataFrame(data, ['value1', 'value2'])
new = (df.select("value1","value2")
      .withColumn('result', col('value1').bitwiseXOR(col('value2')))).display()

cast(dataType)

Zmiana typ danych kolumny

customersDf.select(col("street").cast("integer")).display() 

contains()

Sprawdza czy kolumna zawiera element z listy

containsDf = customersDf.select(col("street"),col("street").contains("RD").alias("Road_Check")).display() 

desc()

Zwraca kolumnę posortowaną malejąco

containsDf.orderBy(col("Road_Check").desc()).display() 

desc_nulls_first()

Zwraca kolumnę posortowaną malejąco. W pierwszej kolejności będą nulle

customersDf.orderBy(col("tax_code").desc_nulls_first()).display() 

desc_nulls_last()

Zwraca kolumnę posortowaną malejąco. W pierwszej kolejności będą wartości a null na końcu

customersDf.orderBy(col("region").desc_nulls_last()).display() 

dropFields(fieldNames)

Wyrażenie, które usuwa pola na podstawie nazwy

from pyspark.sql.functions import col  
from pyspark.sql.types import StructType, StructField, StringType, IntegerType  
  
# Schemat
schema = StructType([  
  StructField("id", IntegerType(), True),  
  StructField("name", StringType(), True),  
  StructField("address", StructType([  
    StructField("street", StringType(), True),  
    StructField("city", StringType(), True),  
    StructField("state", StringType(), True),  
    StructField("zip", StringType(), True),  
    StructField("phone", StringType(), True)  
  ]), True)  
])  
# DataFrame   
data = [(1, "John Doe", {"street": "123 Main St", "city": "Anytown", "state": "CA", "zip": "12345", "phone": "555-555-1234"})]  
df = spark.createDataFrame(data, schema)  
  
# Drop fields 
df = df.withColumn("address_po_zmianie", col("address").dropFields("phone"))  

df.display()

endswith(other)

Zwraca wartość boolean gdy String kończy się wybraną wartością

customersDf.select(col("street"),col("street").endswith("ST")).display() 

eqNullSafe(other)

Test równości bezpieczny dla wartości null

customersDf.select(col("region"),col("region").eqNullSafe("VA")).withColumn("eqNullSafe",col("region").eqNullSafe(None)).display() 

getField(name)

Wyrażenie, które pobiera pole po nazwie w typie danych StructType.

dfGetField = df.withColumn("phone", col("address").getField("phone")).display()

getItem(key)

Wyrażenie, które pobiera element z danej pozycji listy lub pobiera element według klucza z słownika

dfGetItem = df.withColumn("phone", col("address").getItem("phone")).display()

ilike(Inny)

Wyrażenie SQL ILIKE (wielkość liter nie ma znaczenia). Wykona porównanie stringów

customersDf.select(col("street"),col("street").ilike("%RD%")).display() 

isNotNull()

Zwraca wartość boolean True, jeśli bieżące wyrażenie NIE ma wartości null.

customersDf.select(col("street"),col("street").isNotNull().alias("nie nulowe")).display() 

isNull()

Zwraca wartość boolean True, jeśli bieżące wyrażenie ma wartość null.

customersDf.select(col("region"),col("region").isNull()).display() 

isin(columns)

Wyrażenie logiczne, które ma wartość true, jeśli wartość tego wyrażenia jest zawarta w obliczonych wartościach argumentów.

lista = ["BREMEN", "VIENNA", "COLUMBUS"]

customersDf.select(col("city"),col("city").isin(lista)).display() 

like(Other)

Wyrażenie podobne do SQL.

customersDf.select(col("street"),col("street").like("%ST%")).display() 

**name(*alias, kwargs))

Jest pseudonimem dla alias().

customersDf.select(col("street").name("ulica")).display() 

over(window)

Zdefiniuj kolumnę okienkową.

from pyspark.sql.window import Window

windowSpec = Window.partitionBy("loyalty_segment").orderBy("customer_id").rowsBetween(Window.unboundedPreceding, Window.currentRow)
customersDf.select("customer_id","units_purchased","loyalty_segment").withColumn("dense_rank", sum(col("units_purchased")).over(windowSpec)).display()

rlike(Other)

Wyrażenie SQL RLIKE (LIKE z wyrażeniem regularnym).

#znajdz numver w tekscie
customersDf.select("street",col("street").rlike("[0-9]")).display()

startswith(Other)

Ciąg zaczyna się od.

customersDf.select(col("city"),col("city").startswith("A")).display() 

substr(początek Poz, długość)

Zwróć a Column, które jest podciągiem kolumny.

customersDf.select("ship_to_address",col("ship_to_address").substr(1,2).name('state')).display() 

when(stan, wartość)

Ocenia listę warunków i zwraca jedno z wielu możliwych wyrażeń wynikowych.

otherwise(wartość)

Ocenia listę warunków i zwraca jedno z wielu możliwych wyrażeń wynikowych.

customersDf.withColumn("age_group", when(col("units_purchased") < 18, "Mało")
                              .when((col("units_purchased")>= 40) & (col("units_purchased") <= 95), "Srednio")
                              .otherwise("Bardzo duzo")).display()

withField(nazwa pola, kolumna)

Wyrażenie, które dodaje/zastępuje pole według StructTypenazwy.

from pyspark.sql.functions import col  
from pyspark.sql.types import StructType, StructField, StringType, IntegerType  

# DataFrame   
data = [(1, "John Doe", {"street": "123 Main St", "city": "Anytown", "state": "CA", "zip": "12345", "phone": "555-555-1234"})]  
df = spark.createDataFrame(data, schema)  
  
# Dodaję wartość number
df = df.withColumn("location", col('address').withField("number", lit(3)))

df.display()

Przykładowy notatnik

KolumnyPobierz

W kategorii:Spark

Big Data ebook
Subskrybuj
Powiadom o
guest

guest

0 Komentarze
Najstarsze
Najnowsze Najwięcej głosów
Opinie w linii
Zobacz wszystkie komentarze

Pierwszy panel boczny

O MNIE

Narzędzia i dobre procesy do przetwarzania danych to podstawa sukcesu i wartości dla firmy. Czytaj więcej…

big data ebook

Ostatnie wpisy

spark joins

Jak Spark robi join?

13.01.2025 By Krzysztof Nojman

Czy JSON to samo zło

04.01.2025 By Krzysztof Nojman

VS Code nowości AI 

09.12.2024 By Krzysztof Nojman

Linki społecznościowe

  • Facebook
  • GitHub
  • LinkedIn
  • YouTube

Wyszukiwanie

Footer

Najnowsze wpisy

  • Jakość danych w Databricks DQX
  • Jak Spark robi join?
  • Czy JSON to samo zło
  • VS Code nowości AI 
  • Lista narzędzi AI dla każdego inżyniera, które warto znać
  • Kilka pomysłów na konfigurację Databricks
  • Co pamięta wykonawca (executor🧠)

Tagi

AI Apache Spark Architektura Azure BIg Data Certyfikat cloud Databricks Data Factory Dataframe DQX ETL Hurtownia Danych Intellij IoT Jaka technologia Join Kod Konfiguracja lakehouse Narzędzia Optymalizacja pyspark Spark Windows 10 zadania

Informacje Prawne

To jest nudna część lecz wymagana, wszystkie notki prawne o stronie znajdziecie tutaj.

Polityka Prywatności

Regulamin

Copyright © 2025 · Wszelkie prawa zastrzeżone. Krzysztof Nojman

wpDiscuz