Kolumny
Kolumny w Spark Dataframe maja taką samą charakterystykę, jak w przypadku Pandas czy R DataFrames, na pewno znasz je z excela, bądź bazy relacyjnej. Koncepcja jest taka sama. Możesz dokonywać różnych operacji na wybranych lub wszystkich kolumnach. Operacje te będą zależeć od typu danych kolumny.
W Sparku możesz odnieść się do kolumny na kilka sposobów w zależności od języka z jakim pracujesz. Dobrą praktyką jest zaimportowanie odpowiednich bibliotek, najlepiej zrobić to w pierwszych blokach notatnika. Tak jak się to robi w IDE programując w różnych językach.
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, MapType
Przykładowe dane
Nie martw się jeśli nie masz danych, mam dobre wieści, w każdym środowisku roboczym Databricks (Community edition, Azure i AWS) są dostępne dane do nauki Znajdują się w ścieżce dbfs:/databricks-datasets/
Są tam przeróżne dane przydatne w wielu modułach treningowych.
Akcje vs Transformacje
Tutaj lekkie przypomnienie o bardzo ważnej funkcjonalności Sparka. Spark jest leniwy, i tego możemy się od niego nauczyć 😁. Nie robić nic od razu tylko przemyśleć, jak coś zrobić.
Dopóki nie wywołasz akcji Spark nie wykona żadnych transformacji, bo po co, skoro nie każesz niczego pokazać lub zapisać na dysku to on nic nie musi robić. Taki cwaniak. Jest to bardzo rozsądne i znacznie pomaga zoptymalizować operację. Ma to swoje zalety:
- Optymalizacja: Leniwa ocena pozwala Sparkowi zbudować logiczny plan wykonania, zwany DAG (Directed Acycle Graph)
- Zmniejszone wykonanie IO oraz użycia pamięci: Leniwa ocena umożliwia platformie Spark potokowanie (pipelining) operacji i wykonywanie ich w jednej operacji.
Przykładowe operacje na kolumnach
| Metoda | Opis |
|---|---|
| alias(*alias, **kwargs) | Zwraca kolumnę z nową nazwą lub nazw (w przypadku wyrażeń zwracających więcej niż jedną kolumnę, takich jak explode). |
| asc() | Zwraca kolumnę posortowaną rosnąco |
| asc_nulls_first() | Zwraca kolumnę posortowaną rosnąco. W pierszej kolejności będą nulle |
| asc_nulls_last() | Zwraca kolumnę posortowaną rosnąco. W pierszej kolejności będą wartości a null na końcu |
| astype(dataType) | Jest to alias wyrażenia cast() |
| between(lowerBound, upperBound | Zwraca wartość boolean True, jeżeli bieżąca kolumna mieści się między dolnym a górnym zakresem, włącznie. |
| bitwiseAND(other) | Oblicza operację logiczną 'AND’ tego z innym wyrażeniem. |
| bitwiseOR(other) | Oblicza operację logiczną 'OR’ tego z innym wyrażeniem. |
| bitwiseXOR(other) | Oblicza operację logiczną 'XOR’ tego z innym wyrażeniem. Wyrażenie XOR (ang. exclusive or) to operator logiczny, który zwraca prawdę (True) tylko wtedy, gdy jedno z dwóch porównywanych wyrażeń jest prawdziwe (ma wartość True), a drugie jest fałszywe (ma wartość False). Gdy oba wyrażenia mają tę samą wartość logiczną (obie są prawdziwe lub obie są fałszywe), wyrażenie XOR zwraca fałsz (False). |
| cast(dataType) | Zmiena typ danych kolumny |
| contains(other) | Sprawdza czy kolumna zawiera element z listy |
| desc() | Zwraca kolumnę posortowaną malejąco |
| desc_nulls_first() | Zwraca kolumnę posortowaną malejąco. W pierszej kolejności będą nulle |
| desc_nulls_last() | Zwraca kolumnę posortowaną malejąco. W pierszej kolejności będą wartości a null na końcu |
| dropFields(*fieldNames) | Wyrażenie, które usuwa pola na podstawie nazwy |
| endswith(other) | Zwraca wartość boolean gdy String kończy się wybraną wartością |
| eqNullSafe(other) | Test równości bezpieczny dla wartości null |
| getField(name) | Wyrażenie, które pobiera pole po nazwie w typie danych StructType. |
| getItem(key) | Wyrażenie, które pobiera element z danej pozycji listy lub pobiera element według klucza z słownika |
| ilike(Inny) | Wyrażenie SQL ILIKE (wielkość liter nie ma znaczenia). |
| isNotNull() | Zwraca wartośc boolean True, jeśli bieżące wyrażenie NIE ma wartości null. |
| isNull() | Zwraca wartośc boolean True, jeśli bieżące wyrażenie ma wartość null. |
| isin(*cols) | Wyrażenie logiczne, które ma wartość true, jeśli wartość tego wyrażenia jest zawarta w obliczonych wartościach argumentów. |
| like(Other) | Wyrażenie podobne do SQL. |
| name(*alias, **kwargs) | Jest pseudonimem dla alias(). |
| otherwise(wartość) | Ocenia listę warunków i zwraca jedno z wielu możliwych wyrażeń wynikowych. |
| over(window) | Zdefiniuj kolumnę okienkową. |
| rlike(Other) | Wyrażenie SQL RLIKE (LIKE z wyrażeniem regularnym). |
| startswith(Other) | Ciąg zaczyna się od. |
| substr(początekPoz, długość) | Zwróć a Column, które jest podciągiem kolumny. |
| when(stan, wartość) | Ocenia listę warunków i zwraca jedno z wielu możliwych wyrażeń wynikowych. |
| withField(nazwa pola, kolumna) | Wyrażenie, które dodaje/zastępuje pole według StructTypenazwy. |
Dataframe
Żeby stworzyć Dataframe to najprostszą operacją jest wczytanie danych z jakiegoś źródła np. z pliku bądź bazy. Najczęstszą klasą używaną w każdym projekcie jest DataFrameReader, który służy do tworzenia ramki danych.
Przykładowy odczyt pliku csv.
filePath = "dbfs:/databricks-datasets/retail-org/customers/customers.csv"
namesDf = (spark.read.format("csv")
.option("header","true")
.option("inferSchema","true")
.load(filePath))
alias(*alias, **kwargs)
Zwraca kolumnę z nową nazwą lub nazw (w przypadku wyrażeń zwracających więcej niż jedną kolumnę, takich jak explode).
customersDf.select(col("street").alias("Ulica")).display()
asc()
Zwraca kolumnę posortowaną rosnąco
customersDf.orderBy(col("customer_id").asc()).display()
asc_nulls_first()
Zwraca kolumnę posortowaną rosnąco. W pierszej kolejności będą nulle
customersDf.orderBy(col("tax_id").asc_nulls_first()).display()
asc_nulls_last()
Zwraca kolumnę posortowaną rosnąco. W pierszej kolejności będą wartości a null na końcu
customersDf.orderBy(col("tax_id").asc_nulls_last()).display()
astype(dataType)
Jest to alias wyrażenia cast()
customersDf.select(col("customer_id").astype("double")).display()
between(lowerBound, upperBound)
Zwraca wartość boolean True, jeżeli bieżąca kolumna mieści się między dolnym a górnym zakresem, włącznie.
#Zwracam uwagę na nazwę kolumny, tutaj powinieneś zrobić cast żeby to ładnie wyglądało
<code>customersDf.select(col("number").between(1,400)).display()</code>
bitwiseAND()
Oblicza operację logiczną 'AND’ tego z innym wyrażeniem.
data = [(1, 1), (2, 2), (3, 4),(-1,-3)]
df = spark.createDataFrame(data, ['value1', 'value2'])
new = (df.select("value1","value2")
.withColumn('result', col('value1').bitwiseAND(col('value2')))).display()
bitwiseOR()
Oblicza operację logiczną 'OR’ tego z innym wyrażeniem.
data = [(1, 1), (2, 2), (3, 4),(-1,-3)]
df = spark.createDataFrame(data, ['value1', 'value2'])
new = (df.select("value1","value2")
.withColumn('result', col('value1').bitwiseOR(col('value2')))).display()
bitwiseXOR()
Oblicza operację logiczną 'XOR’ tego z innym wyrażeniem. Wyrażenie XOR (ang. exclusive or) to operator logiczny, który zwraca prawdę (True) tylko wtedy, gdy jedno z dwóch porównywanych wyrażeń jest prawdziwe (ma wartość True), a drugie jest fałszywe (ma wartość False). Gdy oba wyrażenia mają tę samą wartość logiczną (obie są prawdziwe lub obie są fałszywe), wyrażenie XOR zwraca fałsz (False).
data = [(1, 1), (2, 2), (3, 4),(-1,-3)]
df = spark.createDataFrame(data, ['value1', 'value2'])
new = (df.select("value1","value2")
.withColumn('result', col('value1').bitwiseXOR(col('value2')))).display()
cast(dataType)
Zmiana typ danych kolumny
customersDf.select(col("street").cast("integer")).display()
contains()
Sprawdza czy kolumna zawiera element z listy
containsDf = customersDf.select(col("street"),col("street").contains("RD").alias("Road_Check")).display()
desc()
Zwraca kolumnę posortowaną malejąco
containsDf.orderBy(col("Road_Check").desc()).display()
desc_nulls_first()
Zwraca kolumnę posortowaną malejąco. W pierwszej kolejności będą nulle
customersDf.orderBy(col("tax_code").desc_nulls_first()).display()
desc_nulls_last()
Zwraca kolumnę posortowaną malejąco. W pierwszej kolejności będą wartości a null na końcu
customersDf.orderBy(col("region").desc_nulls_last()).display()
dropFields(fieldNames)
Wyrażenie, które usuwa pola na podstawie nazwy
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
# Schemat
schema = StructType([
StructField("id", IntegerType(), True),
StructField("name", StringType(), True),
StructField("address", StructType([
StructField("street", StringType(), True),
StructField("city", StringType(), True),
StructField("state", StringType(), True),
StructField("zip", StringType(), True),
StructField("phone", StringType(), True)
]), True)
])
# DataFrame
data = [(1, "John Doe", {"street": "123 Main St", "city": "Anytown", "state": "CA", "zip": "12345", "phone": "555-555-1234"})]
df = spark.createDataFrame(data, schema)
# Drop fields
df = df.withColumn("address_po_zmianie", col("address").dropFields("phone"))
df.display()
endswith(other)
Zwraca wartość boolean gdy String kończy się wybraną wartością
customersDf.select(col("street"),col("street").endswith("ST")).display()
eqNullSafe(other)
Test równości bezpieczny dla wartości null
customersDf.select(col("region"),col("region").eqNullSafe("VA")).withColumn("eqNullSafe",col("region").eqNullSafe(None)).display()
getField(name)
Wyrażenie, które pobiera pole po nazwie w typie danych StructType.
dfGetField = df.withColumn("phone", col("address").getField("phone")).display()
getItem(key)
Wyrażenie, które pobiera element z danej pozycji listy lub pobiera element według klucza z słownika
dfGetItem = df.withColumn("phone", col("address").getItem("phone")).display()
ilike(Inny)
Wyrażenie SQL ILIKE (wielkość liter nie ma znaczenia). Wykona porównanie stringów
customersDf.select(col("street"),col("street").ilike("%RD%")).display()
isNotNull()
Zwraca wartość boolean True, jeśli bieżące wyrażenie NIE ma wartości null.
customersDf.select(col("street"),col("street").isNotNull().alias("nie nulowe")).display()
isNull()
Zwraca wartość boolean True, jeśli bieżące wyrażenie ma wartość null.
customersDf.select(col("region"),col("region").isNull()).display()
isin(columns)
Wyrażenie logiczne, które ma wartość true, jeśli wartość tego wyrażenia jest zawarta w obliczonych wartościach argumentów.
lista = ["BREMEN", "VIENNA", "COLUMBUS"]
customersDf.select(col("city"),col("city").isin(lista)).display()
like(Other)
Wyrażenie podobne do SQL.
customersDf.select(col("street"),col("street").like("%ST%")).display()
**name(*alias, kwargs))
Jest pseudonimem dla alias().
customersDf.select(col("street").name("ulica")).display()
over(window)
Zdefiniuj kolumnę okienkową.
from pyspark.sql.window import Window
windowSpec = Window.partitionBy("loyalty_segment").orderBy("customer_id").rowsBetween(Window.unboundedPreceding, Window.currentRow)
customersDf.select("customer_id","units_purchased","loyalty_segment").withColumn("dense_rank", sum(col("units_purchased")).over(windowSpec)).display()
rlike(Other)
Wyrażenie SQL RLIKE (LIKE z wyrażeniem regularnym).
#znajdz numver w tekscie
customersDf.select("street",col("street").rlike("[0-9]")).display()
startswith(Other)
Ciąg zaczyna się od.
customersDf.select(col("city"),col("city").startswith("A")).display()
substr(początek Poz, długość)
Zwróć a Column, które jest podciągiem kolumny.
customersDf.select("ship_to_address",col("ship_to_address").substr(1,2).name('state')).display()
when(stan, wartość)
Ocenia listę warunków i zwraca jedno z wielu możliwych wyrażeń wynikowych.
otherwise(wartość)
Ocenia listę warunków i zwraca jedno z wielu możliwych wyrażeń wynikowych.
customersDf.withColumn("age_group", when(col("units_purchased") < 18, "Mało")
.when((col("units_purchased")>= 40) & (col("units_purchased") <= 95), "Srednio")
.otherwise("Bardzo duzo")).display()
withField(nazwa pola, kolumna)
Wyrażenie, które dodaje/zastępuje pole według StructTypenazwy.
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
# DataFrame
data = [(1, "John Doe", {"street": "123 Main St", "city": "Anytown", "state": "CA", "zip": "12345", "phone": "555-555-1234"})]
df = spark.createDataFrame(data, schema)
# Dodaję wartość number
df = df.withColumn("location", col('address').withField("number", lit(3)))
df.display()
