logo

PySpark SQL

Apache Spark er den mest succesrige software fra Apache Software Foundation og designet til hurtig databehandling. Flere industrier bruger Apache Spark til at finde deres løsninger. PySpark SQL er et modul i Spark, som integrerer relationel behandling med Sparks funktionelle programmerings-API. Vi kan udtrække dataene ved at bruge et SQL-forespørgselssprog. Vi kan bruge forespørgslerne på samme måde som SQL-sproget.

Hvis du har en grundlæggende forståelse af RDBMS, vil PySpark SQL være nem at bruge, hvor du kan udvide begrænsningen af ​​traditionel relationel databehandling. Spark understøtter også Hive Query Language, men der er begrænsninger for Hive-databasen. Spark SQL blev udviklet for at fjerne ulemperne ved Hive-databasen. Lad os se på følgende ulemper ved Hive:

Ulemper ved Hive

  • Den kan ikke genoptage behandlingen, hvilket betyder, at hvis udførelsen mislykkes midt i en arbejdsgang, kan du ikke genoptage fra det sted, hvor den sad fast.
  • Vi kan ikke slippe de krypterede databaser i kaskade, når papirkurven er aktiveret. Det fører til udførelsesfejlen. For at droppe en sådan type database skal brugerne bruge indstillingen Purge.
  • Ad-hoc-forespørgslerne udføres ved hjælp af MapReduce, som lanceres af Hive, men når vi analyserer den mellemstore database, forsinker det ydeevnen.
  • Hive understøtter ikke opdateringen eller sletningen.
  • Det er begrænset til underforespørgselsunderstøttelse.

Disse ulemper er grundene til at udvikle Apache SQL.

PySpark SQL kort introduktion

PySpark understøtter integreret relationel behandling med Sparks funktionelle programmering. Det giver support til de forskellige datakilder for at gøre det muligt at væve SQL-forespørgsler med kodetransformationer, hvilket resulterer i et meget kraftfuldt værktøj.

PySpark SQL etablerer forbindelsen mellem RDD og relationstabellen. Det giver meget tættere integration mellem relationel og proceduremæssig behandling gennem deklarativ Dataframe API, som er integreret med Spark-kode.

Ved hjælp af SQL kan det være let tilgængeligt for flere brugere og forbedre optimeringen for de nuværende. Det understøtter også den brede vifte af datakilder og algoritmer i Big-data.

Funktion af PySpark SQL

Funktionerne i PySpark SQL er angivet nedenfor:

1) Konsistensdataadgang

Det giver ensartet dataadgang betyder, at SQL understøtter en delt måde at få adgang til en række datakilder som f.eks Hive, Avro, Parket, JSON og JDBC. Det spiller en væsentlig rolle i at imødekomme alle eksisterende brugere i Spark SQL.

2) Inkorporering med Spark

PySpark SQL-forespørgsler er integreret med Spark-programmer. Vi kan bruge forespørgslerne i Spark-programmerne.

100 km/t til mph

En af dens største fordele er, at udviklere ikke behøver manuelt at administrere tilstandsfejl eller holde applikationen synkroniseret med batchjobs.

3) Standardforbindelse

Det giver en forbindelse gennem JDBC eller ODBC, og disse to er industristandarderne for tilslutning til business intelligence-værktøjer.

4) Brugerdefinerede funktioner

PySpark SQL har en sprogkombineret brugerdefineret funktion (UDF'er). UDF bruges til at definere en ny kolonnebaseret funktion, der udvider ordforrådet for Spark SQL's DSL til transformation af DataFrame.

5) Hive-kompatibilitet

PySpark SQL kører umodificerede Hive-forespørgsler på aktuelle data. Det giver fuld kompatibilitet med aktuelle Hive-data.

PySpark SQL-modul

Nogle vigtige klasser af Spark SQL og DataFrames er følgende:

    pyspark.sql.SparkSession:Det repræsenterer hovedindgangspunktet for DataFrame og SQL-funktionalitet.pyspark.sql.DataFrame:Det repræsenterer en distribueret samling af data grupperet i navngivne kolonner.pyspark.sql.Column:Det repræsenterer et kolonneudtryk i en DataFrame. pyspark.sql.Row:Det repræsenterer en række af data i en DataFrame. pyspark.sql.GroupedData:Aggregationsmetoder, returneret af DataFrame.groupBy(). pyspark.sql.DataFrameNaFunctions:Det repræsenterer metoder til håndtering af manglende data (nullværdier).pyspark.sql.DataFrameStatFunctions:Det repræsenterer metoder til statistikfunktionalitet.pysark.sql.funktioner:Det repræsenterer en liste over indbyggede funktioner, der er tilgængelige for DataFrame. pyspark.sql.types:Det repræsenterer en liste over tilgængelige datatyper.pyspark.sql.Window:Det bruges til at arbejde med vinduesfunktioner.

Overvej følgende eksempel på PySpark SQL.

 import findspark findspark.init() import pyspark # only run after findspark.init() from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate() df = spark.sql('''select 'spark' as hello ''') df.show() 

Produktion:

 +-----+ |hello| +-----+ |spark| +-----+ 

Kodeforklaring:

I ovenstående kode har vi importeret findspark modul og ringede findspark.init() konstruktør; derefter importerede vi SparkSession-modulet for at oprette spark-session.

fra pyspark.sql importer SparkSession

En gnistsession kan bruges til at oprette Dataset og DataFrame API. En SparkSession kan også bruges til at oprette DataFrame, registrere DataFrame som en tabel, udføre SQL over tabeller, cache tabel og læse parketfil.

klassebygger

Det er en bygherre af Spark Session.

getOrCreate()

java samlinger

Det bruges til at få en eksisterende SparkSession, eller hvis der ikke er nogen eksisterende, skal du oprette en ny baseret på indstillingerne i builder.

Få andre metoder

Få metoder til PySpark SQL er følgende:

1. appnavn (navn)

Det bruges til at angive navnet på applikationen, som vil blive vist i Spark-webbrugergrænsefladen. Parameteren navn accepterer navnet på parameteren.

2. config(key=Ingen, værdi = Ingen, conf = Ingen)

Det bruges til at indstille en konfigurationsindstilling. Indstillinger, der er indstillet ved hjælp af denne metode, overføres automatisk til begge SparkConf og SparkSession 's konfiguration.

 from pyspark.conf import SparkConfSparkSession.builder.config(conf=SparkConf()) 

Parametre:

    nøgle-En nøglenavnstreng for en konfigurationsegenskab.værdi-Det repræsenterer værdien af ​​en konfigurationsegenskab.konf -En forekomst af SparkConf.

3. mester(mester)

Den indstiller den spark master-url, der skal oprettes forbindelse til, såsom 'local' til at køre lokalt, 'local[4]' til at køre lokalt med 4 kerner.

Parametre:

    mestre:en url til spark master.

4. SparkSession.catalog

Det er en grænseflade, som brugeren kan oprette, slette, ændre eller forespørge på den underliggende database, tabeller, funktioner osv.

5. SparkSession.conf

Det er runtime-konfigurationsgrænseflade til gnist. Dette er grænsefladen, hvorigennem brugeren kan få og indstille alle Spark- og Hadoop-konfigurationer, der er relevante for Spark SQL.

klasse pyspark.sql.DataFrame

Det er en distribueret samling af data grupperet i navngivne kolonner. En DataFrame ligner den relationelle tabel i Spark SQL, kan oprettes ved hjælp af forskellige funktioner i SQLContext.

 student = sqlContext.read.csv('...') 

Efter oprettelse af dataramme kan vi manipulere den ved hjælp af de flere domænespecifikke sprog (DSL), som er foruddefinerede funktioner i DataFrame. Overvej følgende eksempel.

 # To create DataFrame using SQLContext student = sqlContext.read.parquet('...') department = sqlContext.read.parquet('...') student.filter(marks > 55).join(department, student.student_Id == department.id)  .groupBy(student.name, 'gender').({'name': 'student_Id', 'mark': 'department'}) 

Lad os overveje følgende eksempel:

Forespørgsel ved hjælp af Spark SQL

I den følgende kode opretter vi først en DataFrame og udfører SQL-forespørgslerne for at hente dataene. Overvej følgende kode:

 from pyspark.sql import * #Create DataFrame songdf = spark.read.csv(r'C:UsersDEVANSH SHARMA	op50.csv', inferSchema = True, header = True) #Perform SQL queries songdf.select('Genre').show() songdf.filter(songdf['Genre']=='pop').show() 

Produktion:

 +----------------+ | Genre| +----------------+ | canadian pop| | reggaeton flow| | dance pop| | pop| | dfw rap| | pop| | trap music| | pop| | country rap| | electropop| | reggaeton| | dance pop| | pop| | panamanian pop| |canadian hip hop| | dance pop| | latin| | dfw rap| |canadian hip hop| | escape room| +----------------+ only showing top 20 rows +---+--------------------+-------------+-----+----------------+------+------------+--------------+--------+--------+-------+--------------+------------+----------+ |_c0| Track.Name| Artist.Name|Genre|Beats.Per.Minute|Energy|Danceability|Loudness..dB..|Liveness|Valence.|Length.|Acousticness..|Speechiness.|Popularity| +---+--------------------+-------------+-----+----------------+------+------------+--------------+--------+--------+-------+--------------+------------+----------+ | 4|Beautiful People ...| Ed Sheeran| pop| 93| 65| 64| -8| 8| 55| 198| 12| 19| 86| | 6|I Don't Care (wit...| Ed Sheeran| pop| 102| 68| 80| -5| 9| 84| 220| 9| 4| 84| | 8| How Do You Sleep?| Sam Smith| pop| 111| 68| 48| -5| 8| 35| 202| 15| 9| 90| | 13| Someone You Loved|Lewis Capaldi| pop| 110| 41| 50| -6| 11| 45| 182| 75| 3| 88| | 38|Antisocial (with ...| Ed Sheeran| pop| 152| 82| 72| -5| 36| 91| 162| 13| 5| 87| | 44| Talk| Khalid| pop| 136| 40| 90| -9| 6| 35| 198| 5| 13| 84| | 50|Cross Me (feat. C...| Ed Sheeran| pop| 95| 79| 75| -6| 7| 61| 206| 21| 12| 82| +---+--------------------+-------------+-----+----------------+------+------------+--------------+--------+--------+-------+--------------+------------+----------+ 

Brug groupBy()-funktionen

medietransmission

groupBy()-funktionen indsamler lignende kategoridata.

 songdf.groupBy('Genre').count().show() 

Produktion:

 +----------------+-----+ | Genre|count| +----------------+-----+ | boy band| 1| | electropop| 2| | pop| 7| | brostep| 2| | big room| 1| | pop house| 1| | australian pop| 1| | edm| 3| | r&b en espanol| 1| | dance pop| 8| | reggaeton| 2| | canadian pop| 2| | trap music| 1| | escape room| 1| | reggaeton flow| 2| | panamanian pop| 2| | atl hip hop| 1| | country rap| 2| |canadian hip hop| 3| | dfw rap| 2| +----------------+-----+ 

fordeling (antalpartitioner, *cols)

Det fordeling() returnerer en ny DataFrame, som er et partitioneringsudtryk. Denne funktion accepterer to parametre antal partitioner og *kol. Det antal partitioner parameter angiver det ønskede antal kolonner.

 song_spotify.repartition(10).rdd.getNumPartitions() data = song_spotify.union(song_spotify).repartition('Energy') data.show(5) 

Produktion:

 +---+--------------------+-----------+-------+----------------+------+------------+--------------+--------+--------+-------+--------------+------------+----------+ |_c0| Track.Name|Artist.Name| Genre|Beats.Per.Minute|Energy|Danceability|Loudness..dB..|Liveness|Valence.|Length.|Acousticness..|Speechiness.|Popularity| +---+--------------------+-----------+-------+----------------+------+------------+--------------+--------+--------+-------+--------------+------------+----------+ | 4|Beautiful People ...| Ed Sheeran| pop| 93| 65| 64| -8| 8| 55| 198| 12| 19| 86| | 5|Goodbyes (Feat. Y...|Post Malone|dfw rap| 150| 65| 58| -4| 11| 18| 175| 45| 7| 94| | 17| LA CANCI?N| J Balvin| latin| 176| 65| 75| -6| 11| 43| 243| 15| 32| 90| | 4|Beautiful People ...| Ed Sheeran| pop| 93| 65| 64| -8| 8| 55| 198| 12| 19| 86| | 5|Goodbyes (Feat. Y...|Post Malone|dfw rap| 150| 65| 58| -4| 11| 18| 175| 45| 7| 94| +---+--------------------+-----------+-------+----------------+------+------------+--------------+--------+--------+-------+--------------+------------+----------+ only showing top 5 rows