PySpark environment setup
Configuring hadoop
When spark accesses local files and performs operations, it may encounter permission problems or dll errors. This is because spark need to use to Hadoop winutils and, first of all, we must configure the Hadoop-related environment. You can go to github to download:/4ttty/winutils
gitcode provides mirror acceleration: /mirrors/4ttty/winutils
I chose to use the highest Hadoop version 3.0.0 available from this repository extracting it into the D:\deploy\hadoop-3.0.0 directory and configuring the environment variables:
We also need to copy the counterpart to the system, expressed as a command:
copy D:\deploy\hadoop-3.0.0\bin\ C:\Windows\System32
However, this step requires administrator privileges to operate.
To be able to use the winutils command tool from anywhere, set the%HADOOP_HOME%\bin
directory to the environment variable:
Installing pyspark with Java
First, we install the current (2022-2-17) latest version of spark:
pip install pyspark==3.2.1
Note that the version of pyspark determines the maximum version of the jdk, for example, if you install version 2.4.5 of pyspark, you can only install version 1.8 of the jdk, otherwise you will be reported as: Unsupported class file major version 55
The error.
This is because pyspark has built-in Scala, which is a jvm-based programming language, and there is a compatibility issue between Scala and the version of jdk.JDK and scala version compatibility table:
JDK version | Minimum Scala versions | Recommended Scala versions |
---|---|---|
17 | 2.13.6, 2.12.15 (forthcoming) | 2.13.6, 2.12.15 (forthcoming) |
16 | 2.13.5, 2.12.14 | 2.13.6, 2.12.14 |
13, 14, 15 | 2.13.2, 2.12.11 | 2.13.6, 2.12.14 |
12 | 2.13.1, 2.12.9 | 2.13.6, 2.12.14 |
11 | 2.13.0, 2.12.4, 2.11.12 | 2.13.6, 2.12.14, 2.11.12 |
8 | 2.13.0, 2.12.0, 2.11.0, 2.10.2 | 2.13.6, 2.12.14, 2.11.12, 2.10.7 |
6, 7 | 2.11.0, 2.10.0 | 2.11.12, 2.10.7 |
The current 3.2.1 version of pyspark has a built-in Scala version of 2.12.15, meaning that jdk17 is supported with all versions up to that point.
Here I still choose to install the jdk8 version:
Test it:
>java -version java version "1.8.0_201" Java(TM) SE Runtime Environment (build 1.8.0_201-b09) Java HotSpot(TM) 64-Bit Server VM (build 25.201-b09, mixed mode)
jdk11 detailed installation tutorial (jdk1.8 in the official website only installation package, no zip green compressed package):
Green Java11 environment configuration and Python call Java
/article/details/118366166
graphframes installation
pip installs the latest current graphframes:
pip install graphframes==0.6
Then download the graphframes jar package from the official website.
Download: /package/graphframes/graphframes
Since the installed version of pyspark is 3.2, I chose this jar package here:
The jar package is then placed in the jars directory of the pyspark installation directory:
The location of the pyspark installation can be viewed via pip:
C:\Users\ASUS>pip show pyspark Name: pyspark Version: 3.2.1 Summary: Apache Spark Python API Home-page: /apache/spark/tree/master/python Author: Spark Developers Author-email: dev@ License: /licenses/LICENSE-2.0 Location: d:\miniconda3\lib\site-packages Requires: py4j Required-by:
Usage
The best path to learn pyspark is the official website: /docs/latest/
At the following page, the official online jupyter is available:
/docs/latest/api/python/getting_started/
Start spark and read the data
Start spark in local mode:
from import SparkSession, Row spark = SparkSession \ .builder \ .appName("Python Spark") \ .master("local[*]") \ .getOrCreate() sc = spark
The SparkSession output contains the spark web page, and the new tab opens the page with the general effect as above.
Click the Environment tab to see the variables in the current environment:
Start hive support
Find the location where pyspark is installed, e.g. my computer is at D:\Miniconda3\Lib\site-packages\pyspark
Manually create the conf directory and copy the configuration files into it. If hive uses MySQL as the original database, you will also need to put the MySQL corresponding driver jar package into the spark's jars directory.
Creating a spark session object can be accomplished via theenableHiveSupport()
Turn on hive support:
from import SparkSession from import Row spark = SparkSession \ .builder \ .appName("Python Spark SQL Hive integration example") \ .enableHiveSupport() \ .getOrCreate() sc = spark
It is possible for spark to access a table created by hive itself with the following permission error:
Caused by: : The root scratch dir: /tmp/hive on HDFS s
hould be writable. Current permissions are: rwx------
It is because the current user does not have the permission to operate on \tmp\hive:
>winutils ls \tmp\hive drwx------ 1 BUILTIN\Administrators XIAOXIAOMING\None 0 May 4 2020 \tmp\hive
Change the permissions of the \tmp\hive directory to 777 for smooth access:
>winutils chmod 777 \tmp\hive >winutils ls \tmp\hive drwxrwxrwx 1 BUILTIN\Administrators XIAOXIAOMING\None 0 May 4 2020 \tmp\hive
DataFrame and RDD for Spark
From the beginning, the APIs of RDD and DataFrame are unified and abstracted into dataset, DataFrame is Dataset[Row], and RDD is. DataFrame can be understood as RDD containing structured information.
Converting a row-containing RDD to a DataFrame is simply a matter of calling the toDF method or the SparkSession's createDataFrame method, or you can pass in a schema to override the type or name setting.
Basic api for DataFrame
DataFrame supports DSL style syntax by default, for example:
// Viewing the contents of a DataFrame () //View the contents of the columns in the DataFrame section (df['name'], df['age'] + 1).show() ("name").show() // Print the Schema information of the DataFrame. () //filter age greater than or equal to 21 (df['age'] > 21).show() //Grouping by age and counting the number of persons of the same age ("age").count().show()
After registering a DataFrame as a table or view, you can perform pure SQL operations:
("people") //("t_person") // Inquire about the top two oldest ("select * from t_person order by age desc limit 2").show() //Display the Schema information of the table. ("desc t_person").show()
Pyspark can directly and easily register udf and use it directly:
strlen = ("len", lambda x: len(x)) print(("SELECT len('test') length").collect()) print(("SELECT 'foo' AS text").select(strlen("text").alias('length')).collect())
Implementation results:
[Row(length='4')]
[Row(length='3')]
Introduction to RDD
The essence of DataFrame is the wrapping of RDD, which can be understood as DataFrame=RDD[Row]+schema.
RDD (A Resilient Distributed Dataset) called Resilient Scalable Distributed Dataset is the most basic data abstraction in Spark. It represents a collection that is immutable, automatically fault-tolerant, scalable, partitionable, and the elements inside can be computed in parallel.
There are five major attributes within each RDD:
- Has a series of partitions
- A computational function operates on each slice
- Has a list of dependencies on other RDDs
- For key-value RDDs to have a Partitioner partitioner
- Store the optimal computational position for each slice
A set of partitions, i.e., the basic unit of composition of a dataset. For RDDs, each slice is processed by a computational task and determines the granularity of parallel computation. The user can specify the number of slices for an RDD when creating the RDD, if not, then the default value is used. The default value is the number of CPU Cores allocated to the program.
** A function that computes each partition. ** RDDs in Spark are computed in partitions, and each RDD implements a compute function for this purpose. compute function compounds the iterator and does not need to save the results of each computation.
**Dependencies between RDDs. Each transformation of the **RDD generates a new RDD, so a back-and-forth dependency is formed between the RDDs similar to a pipeline. In the event that some partition data is lost, Spark can recalculate the lost partition data through this dependency instead of recalculating all the partitions of the RDD.
** A Partitioner, which is the slicing function of an RDD. **Currently Spark implements two types of partitioning functions, one is the hash-based HashPartitioner, and the other is the range-based RangePartitioner. only for key-value RDDs, there is a Partitioner, and the value of the non-key-value RDD's The Partitioner function determines not only the number of slices in the RDD itself, but also the number of slices in the parent RDD Shuffle output.
** A list storing the preferred location for accessing each Partition. **For an HDFS file, this list is the location of the block where each Partition is located. In line with the concept of "moving data is not as good as moving computation", Spark assigns computation tasks to the storage location of the data block they are to be processed as much as possible during task scheduling.
RDD API Overview
The RDD containsTransformation APIcap (a poem)Action APIThe Transformation APIs are all delayed loading just remembering these transformations applied to the underlying dataset, and these transformations only actually run when the Action API is executed.
The two most important types of RDDs generated by the Transformation API areMapPartitionsRDDcap (a poem)ShuffledRDD。
The operators that generate MapPartitionsRDDs are map, keyBy, keys, values, flatMap, mapValues , flatMapValues, mapPartitions, mapPartitionsWithIndex, glom, filter, and filterByRange. The most used aremapcap (a poem)flatMap, but any operator that produces MapPartitionsRDDs can be used directlymapPartitionsmaybemapPartitionsWithIndexRealization.
come into beingShuffledRDDThe operators ofcombineByKeyWithClassTag、combineByKey、aggregateByKey、foldByKey 、reduceByKey 、distinct、groupByKey、groupBy、partitionBy、sortByKey cap (a poem) repartitionAndSortWithinPartitions。
CombineByKey to groupByKey The bottom line is to call combineByKeyWithClassTag:
@Experimental def combineByKeyWithClassTag[C]( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C)(implicit ct: ClassTag[C]): RDD[(K, C)] = { combineByKeyWithClassTag(createCombiner,mergeValue,mergeCombiners ,defaultPartitioner(self)) } def combineByKey[C]( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)] = { combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null) } def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = { val createCombiner = (v: V) => CompactBuffer(v) val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2 val bufs = combineByKeyWithClassTag[CompactBuffer[V]]( createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false) [RDD[(K, Iterable[V])]] }
Meaning of the three important parameters:
- createCombiner: generates an initial value based on the first element of each partition.
- mergeValue: iteratively merge elements inside each partition
- mergeCombiners: merge the results of the merge of all partitions
The default default Partitioner is passed when the partitioner of groupByKey is not specified. for example:
val a = (List("dog", "tiger", "lion", "cat", "spider", "eagle"), 2).keyBy(_.length) res9: Array[(Int, Iterable[String])] = Array((4,CompactBuffer(lion)), (6,CompactBuffer(spider)), (3,CompactBuffer(dog, cat)), (5,CompactBuffer(tiger, eagle)))
aggregateByKey: each partition uses zeroValue as the initial value, iterates over each element using seqOp for merging, and combOp for the results of all partitions. Example:
val pairRDD = (List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2) (0)((_, _), _ + _).collect res6: Array[(String, Int)] = Array((dog,12), (cat,17), (mouse,6)) (100)((_, _), _ + _).collect res7: Array[(String, Int)] = Array((dog,100), (cat,200), (mouse,200))
reduceByKey : Each partition iterates over each element with func to merge, and the results for all partitions are merged again with func, for example:
val a = (List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2) val b = (x => (, x)) (_ + _).collect
Action APIThere is:
movements | hidden meaning |
---|---|
reduce(func) | Aggregate all elements in the RDD via the func function, which must be class-swappable and parallelizable |
collect() | In the driver, return all elements of the dataset as an array |
count() | Returns the number of elements of the RDD |
first() | Returns the first element of the RDD (similar to take(1)) |
take(n) | Returns an array consisting of the first n elements of the dataset |
takeSample(withReplacement*,*num, [seed]) | Returns an array of num elements randomly sampled from the dataset, with the option of replacing the shortfall with a random number, seed is used to specify the random number generator seed |
takeOrdered(n, [ordering]) | Sort and take the first N elements |
saveAsTextFile(path) | Save the elements of the dataset as a textfile to the HDFS file system or another supported file system, and for each element, Spark will call the toString method to convert it to text in the file |
saveAsSequenceFile(path) | Saves the elements of the dataset to a specified directory in the format of a Hadoop sequencefile, which can make HDFS or other Hadoop-supported file systems. |
saveAsObjectFile(path) | Save the elements of the RDD in sequencefile format with NullWritable as key and the actual element as value. |
countByKey() | For RDDs of type (K,V), returns a map of (K,Int) indicating the number of elements corresponding to each key. |
foreach(func) | On each element of the dataset, the function func is run to update it. |
The spark simulation implements the mapreduce version of wordcount:
object MapreduceWordcount { def main(args: Array[String]): Unit = { import ._ val sc: SparkContext = new SparkContext(new SparkConf().setAppName("wordcount").setMaster("local[*]")) ("WARN") import .{LongWritable, Text} import import import def map(k: LongWritable, v: Text, collect: ArrayBuffer[(String, Int)]) = { for (word <- ("\\s+")) collect += ((word, 1)) } def reduce(key: String, value: Iterator[Int], collect: ArrayBuffer[(String, Int)]) = { collect += ((key, )) } val rdd = ("/hdfs/wordcount/in1/*", classOf[TextInputFormat], classOf[LongWritable], classOf[Text], 2) .asInstanceOf[HadoopRDD[LongWritable, Text]] .mapPartitionsWithInputSplit((split, it) =>{ val collect: ArrayBuffer[(String, Int)] = ArrayBuffer[(String, Int)]() (kv => map(kv._1, kv._2, collect)) }) .repartitionAndSortWithinPartitions(new HashPartitioner(2)) .mapPartitions(it => { val collect: ArrayBuffer[(String, Int)] = ArrayBuffer[(String, Int)]() var lastKey: String = "" var values: ArrayBuffer[Int] = ArrayBuffer[Int]() for ((currKey, value) <- it) { if (!(lastKey)) { if ( != 0) reduce(lastKey, , collect) () } values += value lastKey = currKey } if ( != 0) reduce(lastKey, , collect) }) (println) } }
Various RDDs
- ShuffledRDD : Indicates a network transmission that requires a Shuffle process.
- CoalescedRDD : Used to merge multiple partitions of a machine into a single partition
- CartesianRDD : produce a Cartesian product of all elements of two RDDs
- MapPartitionsRDD : Used for specific processing of data in each partition
- CoGroupedRDD : Used for aggregating 2~4 rdd's according to key
- SubtractedRDD : for finding the difference set of 2 RDDs
- UnionRDDcap (a poem)PartitionerAwareUnionRDD : Used to find concatenation of 2 RDDs
- ZippedPartitionsRDD2: RDD generated by zip zipper operation
- ZippedWithIndexRDD: mark each element with a self-incrementing number
- PartitionwiseSampledRDD: used to randomly sample the elements of the rdd by a specified percentage
When we need to add self-incrementing columns to a Datafream, we can use thezipWithUniqueIdMethods:
from import StructType, LongType schema = (StructField("id", LongType())) rowRDD = ().map(lambda t: t[0]+Row(t[1])) data = (schema) ()
API usage details can be found at: /docs/latest/api/python/reference/api/#
cache&checkpoint
RDDs can cache the results of previous computations through the persist method or the cache method, but not immediately when these two methods are called, but when a later action is triggered, the RDD will be cached in the compute node's memory and made available for reuse later.
()
The source code comments for checkpoint can be seen:
- Mark this RDD as a checkpoint.
- It will be saved in the checkpoint directory set via the SparkContext#setCheckpointDir method
- All references to the parent RDD that it references will be removed.
- This method must be run before all jobs are executed on this RDD.
- It is highly recommended to cache this RDD in memory, otherwise the computation task for this saved file will be recomputed.
From this we learn that it is better to execute the checkpoint method while, at the same time, caching that RDD, otherwise, checkpoint will also generate a computation task.
("checkpoint") () ()
Usage of graphframes
GraphFrame is a Graph manipulation interface that unifies the Graph algorithms in Spark into the DataFrame interface, providing a unified graph processing API for Scala, Java and Python.
Graphframes is an open source project, the source code project is as follows:/graphframes/graphframes
Reference can be made:
- Official website: /graphframes/docs/_site/
- GraphFrames User Guide - Python - Databricks documentation: /spark/latest/graph-analysis/graphframes/
In GraphFrames the vertices (Vertex) and edges (edge) of the graph are stored as DataFrame:
- Vertex DataFrame: must contain a column with the name "id" to uniquely identify the vertex.
- Side DataFrame: must contain columns with names "src" and "dst", identifying the relationship by a unique identifier id.
Example of creating a diagram:
from graphframes import GraphFrame vertices = ([ ("a", "Alice", 34), ("b", "Bob", 36), ("c", "Charlie", 30), ("d", "David", 29), ("e", "Esther", 32), ("f", "Fanny", 36), ("g", "Gabby", 60)], ["id", "name", "age"]) edges = ([ ("a", "b", "friend"), ("b", "c", "follow"), ("c", "b", "follow"), ("f", "c", "follow"), ("e", "f", "follow"), ("e", "d", "friend"), ("d", "a", "friend"), ("a", "e", "friend") ], ["src", "dst", "relationship"]) # Generate diagrams g = GraphFrame(vertices, edges)
GraphFrame provides three views:
print("Vertex table view:") () # It's the original vertices # print("Side table view:") () # It's the original edges print("Ternary view:") ()
Get the degree, in-degree and out-degree of a vertex:
# The degree of the vertex () # The incidence of the vertex () # Vertex out of degree ()
Motif finding
Example:
# Multiple path conditions motif = ("(a)-[e]->(b); (b)-[e2]->(a)") # Filtering on search results (" > 30") # Use anonymous vertices and edges when you don't need to return elements of a path motif = ("(start)-[]->()") # Set the condition that the path does not exist motif = ("(a)-[]->(b); !(b)-[]->(a)")
Suppose we want to recommend a user to follow, we can find out the relationship: A follows B, B follows C, but A has not followed C. Finding out such a relationship can recommend C to A:
# Motif: A->B->C but not A->C results = ("(A)-[]->(B); (B)-[]->(C); !(A)-[]->(C)") # Exclude yourself results = (" != ") # Select the desired columns results = (("A"), ("C")) ()
Results:
+---+---+
| A| C|
+---+---+
| e| c|
| e| a|
| d| b|
| a| d|
| f| b|
| d| e|
| a| f|
| a| c|
+---+---+
Motif can also carry state along the path during the path finding process. For example, we want to find a relationship chain that has 4 vertices, and 3 of the edges are all "friend" relationships:
from import col, lit, when from functools import reduce chain4 = ("(a)-[ab]->(b); (b)-[bc]->(c); (c)-[cd]->(d)") def sumFriends(cnt, relationship): "Define the condition for the next vertex to update its state: cnt+1 if the relation is friend" return when(relationship == "friend", cnt+1).otherwise(cnt) # Apply the update method to the entire chain, adding one for each relationship in the chain that is a friend, for a total of three relationships in the chain. condition = reduce(lambda cnt, e: sumFriends( cnt, col(e).relationship), ["ab", "bc", "cd"], lit(0)) chainWith2Friends2 = (condition >= 3) ()
Results:
+---------------+--------------+---------------+--------------+---------------+--------------+---------------+
| a| ab| b| bc| c| cd| d|
+---------------+--------------+---------------+--------------+---------------+--------------+---------------+
| {a, Alice, 34}|{a, e, friend}|{e, Esther, 32}|{e, d, friend}| {d, David, 29}|{d, a, friend}| {a, Alice, 34}|
|{e, Esther, 32}|{e, d, friend}| {d, David, 29}|{d, a, friend}| {a, Alice, 34}|{a, b, friend}| {b, Bob, 36}|
| {d, David, 29}|{d, a, friend}| {a, Alice, 34}|{a, e, friend}|{e, Esther, 32}|{e, d, friend}| {d, David, 29}|
|{e, Esther, 32}|{e, d, friend}| {d, David, 29}|{d, a, friend}| {a, Alice, 34}|{a, e, friend}|{e, Esther, 32}|
+---------------+--------------+---------------+--------------+---------------+--------------+---------------+
Subgraphs
can filter its vertices or edges directly.dropIsolatedVertices()
method is used to remove isolated unconnected points:
("age > 30").filterEdges("relationship = 'friend'").dropIsolatedVertices()
Subgraphs can also be created based on edges obtained from pattern discovery :
paths = ("(a)-[e]->(b)")\ .filter(" = 'follow'")\ .filter(" < ") # Extract edge information e2 = ("", "", "") e2 = ("e.*") # Create Subgraphs g2 = GraphFrame(, e2)
GraphX algorithms supported by GraphFrames
- PageRank: Find important vertices in a graph.
- breadth-first search(BFS): find the shortest path from a set of vertices to another set of vertices
- connecting component(ConnectedComponents): assign the same component ID to vertices that have a connected relationship
- Strongly connected components(StronglyConnectedConponents): assign SCCs based on the strongly connected components of each vertex.
- shortest path(Shortest paths): find the shortest path from each vertex to the target vertex set.
- triangle count(TriangleCount): counts the number of triangles to which each vertex belongs and is often used to determine the stability of a group (the number of interconnections represents stability) or as part of other network metrics (e.g., clustering coefficients) used to detect communities in social network analysis.
- Label propagation algorithms(LPA): detecting communities in the graph.
The pageRank algorithm:
results = (resetProbability=0.15, maxIter=10) ("pagerank", ascending=False).show()
Results:
+---+-------+---+-------------------+
| id| name|age| pagerank|
+---+-------+---+-------------------+
| b| Bob| 36| 2.7025217677349773|
| c|Charlie| 30| 2.6667877057849627|
| a| Alice| 34| 0.4485115093698443|
| e| Esther| 32| 0.3613490987992571|
| f| Fanny| 36|0.32504910549694244|
| d| David| 29|0.32504910549694244|
| g| Gabby| 60|0.17073170731707318|
+---+-------+---+-------------------+
You can set the starting vertex:
(resetProbability=0.15, maxIter=10, sourceId="a") (resetProbability=0.15, sourceIds=["a", "b", "c", "d"], maxIter=10)
Breadth-first search BFS:
Search for the smallest path from the name Esther to an age less than 32:
paths = ("name = 'Esther'", "age < 32") ()
+--------------+--------------+---------------+ | from| e0| to| +--------------+--------------+---------------+ |{a, Alice, 34}|{a, e, friend}|{e, Esther, 32}| +--------------+--------------+---------------+
You can specify that you can only search on the specified side:
("name = 'Esther'", "age < 32", edgeFilter="relationship != 'friend'", maxPathLength=4 ).show()
+---------------+--------------+--------------+--------------+----------------+ | from| e0| v1| e1| to| +---------------+--------------+--------------+--------------+----------------+ |{e, Esther, 32}|{e, f, follow}|{f, Fanny, 36}|{f, c, follow}|{c, Charlie, 30}| +---------------+--------------+--------------+--------------+----------------+
Connected components connecting component:
Checkpoints must be set first:
("checkpoint") ().show()
Results:
+---+-------+---+------------+
| id| name|age| component|
+---+-------+---+------------+
| a| Alice| 34|412316860416|
| b| Bob| 36|412316860416|
| c|Charlie| 30|412316860416|
| d| David| 29|412316860416|
| e| Esther| 32|412316860416|
| f| Fanny| 36|412316860416|
| g| Gabby| 60|146028888064|
+---+-------+---+------------+
One can see that only the g-points are in a connected region, and one can call thedropIsolatedVertices()
method that removes such isolated unconnected points:
().connectedComponents().show()
Results:
+---+-------+---+------------+
| id| name|age| component|
+---+-------+---+------------+
| a| Alice| 34|412316860416|
| b| Bob| 36|412316860416|
| c|Charlie| 30|412316860416|
| d| David| 29|412316860416|
| e| Esther| 32|412316860416|
| f| Fanny| 36|412316860416|
+---+-------+---+------------+
Strongly connected components Strongly connected components:
(maxIter=10).show()
Shortest paths:
The shortest path to a or d for each vertex:
(landmarks=["a", "d"]).show()
+---+-------+---+----------------+ | id| name|age| distances| +---+-------+---+----------------+ | g| Gabby| 60| {}| | f| Fanny| 36| {}| | e| Esther| 32|{a -> 2, d -> 1}| | d| David| 29|{a -> 1, d -> 0}| | c|Charlie| 30| {}| | b| Bob| 36| {}| | a| Alice| 34|{a -> 0, d -> 2}| +---+-------+---+----------------+
Triangle count The triangle count:
().show()
+-----+---+-------+---+ |count| id| name|age| +-----+---+-------+---+ | 1| a| Alice| 34| | 0| b| Bob| 36| | 0| c|Charlie| 30| | 1| d| David| 29| | 1| e| Esther| 32| | 0| g| Gabby| 60| | 0| f| Fanny| 36| +-----+---+-------+---+
Show that vertices a/e/d form a triangle.
Label Propagation Algorithm (LPA):
(maxIter=5).orderBy("label").show()
+---+-------+---+-------------+ | id| name|age| label| +---+-------+---+-------------+ | g| Gabby| 60| 146028888064| | f| Fanny| 36|1047972020224| | b| Bob| 36|1047972020224| | a| Alice| 34|1382979469312| | c|Charlie| 30|1382979469312| | e| Esther| 32|1460288880640| | d| David| 29|1460288880640| +---+-------+---+-------------+
Integration with pandas
Pyspark has appeared since version 3.0 with the pandas_udf decorator, applyInPandas, and mapInPandas, and based on these methods, we can use the familiar syntax of pandas to process the data of a spark object.
First create a few test data and startApache Arrow:
("", "true") df = ( [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v")) ()
Customizing UDFs and UDAFs
Custom UDTFs are not supported by pyspark at this time.
Using the pandas_udf decorator we can create pandas-based udf custom functions that can be used directly in the DSL syntax:
from import pandas_udf import pandas as pd @pandas_udf("double") def multiply_func(a: , b: ) -> : return a * b (multiply_func("id", "v").alias("product")).show()
After registering functions and views, you can use them directly in SQL:
("t") ("multiply", multiply_func) ('select multiply(id, v) product from t').show()
The results were all:
+-------+
|product|
+-------+
| 1.0|
| 2.0|
| 6.0|
| 10.0|
| 20.0|
+-------+
Aggregate and window functions are also supported:
from import Window @pandas_udf("double") def mean_udf(v: ) -> float: return () # Perform averaging on field 'v' (mean_udf('v').alias("mean_v")).show() # Find the mean of 'v' grouped by 'id' ("id").agg(mean_udf('v').alias("mean_v")).show() # Group by 'id', find the mean value of 'v' and assign it to a new column ('mean_v', mean_udf("v").over(('id'))).show()
The same can be achieved using SQL directly after registering to udf:
("mean2", mean_udf) ('select mean2(v) mean_v from t').show() ('select id,mean2(v) mean_v from t group by id').show() ('select id,v,mean2(v) over(partition by id) mean_v from t').show()
The results were all:
+--------+
| mean_v |
+--------+
| 4.2|
+--------++---+--------+
| id| mean_v |
+---+--------+
| 1| 1.5|
| 2| 6.0|
+---+--------++---+----+------+
| id| v|mean_v|
+---+----+------+
| 1| 1.0| 1.5|
| 1| 2.0| 1.5|
| 2| 3.0| 6.0|
| 2| 5.0| 6.0|
| 2|10.0| 6.0|
+---+----+------+
Grouping Aggregation with JOIN
applyInPandas needs to be used after datafream calls groupby:
def subtract_mean(pdf): v = pdf['v1'] = v - () pdf['v2'] = v + () return pdf t = ("id") ( subtract_mean, schema="id long, v double, v1 double, v2 double").show()
Results:
+---+----+----+----+
| id| v| v1| v2|
+---+----+----+----+
| 1| 1.0|-0.5| 2.5|
| 1| 2.0| 0.5| 3.5|
| 2| 3.0|-3.0| 9.0|
| 2| 5.0|-1.0|11.0|
| 2|10.0| 4.0|16.0|
+---+----+----+----+
The subtract_mean function receives the dataframe data corresponding to the id, and the schema specifies the list of names and types of the returned values.
With the following code we can know that applyInPandas can perform table joins with the help of cogroup:
val a = (List(1, 2, 1, 3)) val b = ((_, "b")) val c = ((_, "c")) val d = ((_, "d")) val e = ((_, "e")) scala> (c).foreach(println) (3,(CompactBuffer(b),CompactBuffer(c))) (1,(CompactBuffer(b, b),CompactBuffer(c, c))) (2,(CompactBuffer(b),CompactBuffer(c)))
Example:
df1 = ( [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)], ("time", "id", "v1")) df2 = ( [(20000101, 1, "x"), (20000101, 2, "y")], ("time", "id", "v2")) def asof_join(l, r): # l、r is a # It's grouped by id # Then, l and r are the df1 and df2 data corresponding to the ids, respectively return pd.merge_asof(l, r, on="time", by="id") ("id").cogroup(("id")).applyInPandas( asof_join, schema="time int, id int, v1 double, v2 string").show() # +--------+---+---+---+ # | time| id| v1| v2| # +--------+---+---+---+ # |20000101| 1|1.0| x| # |20000102| 1|3.0| x| # |20000101| 2|2.0| y| # |20000102| 2|4.0| y| # +--------+---+---+---+
Map Iteration
Execute the following code:
def filter_func(iterator): for i, pdf in enumerate(iterator): print(i, ()) yield pdf (filter_func, schema=).show()
The backend sees the result of the execution as:
0 [[2.0, 5.0]]
0 [[2.0, 3.0]]
0 [[1.0, 1.0]]
0 [[1.0, 2.0]]
0 [[2.0, 10.0]]
The foreground result stays almost the same. You can tell that iterator is a partition iterator that iterates over each row of data from the current partition encapsulated into a pandas object.
Pyspark Interaction with Pandas
Converting spark's Datafream object to a native pandas object is simply a matter of calling the toPandas() method:
()
Converting native pandas objects to spark objects can be done using the top level methods of spark:
(pdf)
If you are used to using pandas, you can also use pandas-on-Spark directly, which has been matched to the API of pandas version 1.3 in spark version 3.2.0. With pandas-on-Spark, we can manipulate the data entirely with the api of pandas, while the underlying execution is a parallelization of spark.
With pandas-on-Spark it's a good idea to set the environment variables:
import os ["PYARROW_IGNORE_TIMEZONE"] = "1"
Convert spark objects to pandas-on-spark objects:
df = ( [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v")) pdf = df.to_pandas_on_spark() print(type(pdf)) pdf
The pandas-on-spark object can also be reduced to a spark object:
pdf.to_spark()
Also spark provides api to read files directly into pandas-on-Spark objects, for example:
import as ps pdf = ps.read_csv("example_csv.csv")
The ps object has an almost identical API to the native pandas object.
The ps object has a nearly identical API relative to the native pandas object, and also supports some powerful features such as direct access as SQL:
("SELECT count(*) as num FROM {pdf}")
{pdf} accesses the pandas-on-spark object with variable name pdf.
This article on the installation and use of PySpark and GraphFrames is introduced to this article, more related to the use of PySpark and GraphFrames content, please search for my previous articles or continue to browse the following related articles I hope you will support me in the future!