How to concatenate columns in Spark dataframe?
Concatenate functions in Spark SQL
The module pyspark.sql.functions provides two concatenate functions as below
- concat – It concatenates multiple input columns together into a single column.
- concat_ws – It concatenates multiple input string columns together into a single string column, with the given separator.
In this tutorial, we use PySpark program to explain the concatenate functions.
Syntax
1 2 3 |
concat(string_1, string_2,.. ,string_n) concat_ws(separator, string_1, string_2, ....,string_n) |
- string_1, string_2,…string_n – The input strings to add together.
- separator – It is a separator string/character.
Example 1: Using concat() function to join columns in Spark dataframe
First we are creating the Spark dataframe with customer’s street and city details.
1 2 3 4 5 6 7 8 |
from pyspark.sql.functions import * data = [(124,'Garfield St','Hartford'), (192,'Louise St', 'Miami'), (489,'Boye St','Dallas')] columns = ['Customer_Id','Street','City'] # create dataframe df = spark.createDataFrame(data, columns) df.show() |
Output
Let’s join street and city columns to get the mailing address of the customer. As shown below, we have used concat function to concatenate columns “Street” and “City” in Spark dataframe. Also, Alias function is used to give the name for the derived column. The new dataframe is saved with the name of address_df.
1 2 3 4 5 6 |
# concatenate columns in spark dataframe address_df = df.select( concat(col("Street"),col("City")) .alias("Address") ) address_df.show() |
Output
As shown below, the derived column “Address” has the concatenated values in the dataframe “address_df“
Let’s add a separator comma(,) between street and city value while concatenating string. So that it will be easier to understand the address. Also we are going to select the existing columns street and city for creating the dataframe “address_df“.
- lit() function is used to add the literal or constant value. Here we mentioned comma(,) as literal value.
- col() function is used to return the column based on the given column name.
1 2 3 4 5 6 |
# concatenate columns with alias name in spark dataframe address_df = df.select(col('Customer_Id'),col('Street'),col('City'), concat(col("Street"),lit(','),col("City")) .alias("Address") ) address_df.show() |
Output
Example 2: Concatenate columns using concat() and withColumn() function
Spark withColumn() function is mainly used for transformation. We can use this function to create the new column. Let’s use withColumn() in the concatenate operation. Please make sure that the new column name is not present in the dataframe. Otherwise withColumn() modify the value of that column.
1 2 3 4 5 6 7 8 9 10 11 12 |
print("-- dataframe : df ----") df.show() # withColumn() to create the new column "Address" address_df = df.withColumn("Address", concat(col("Street"), lit(','), col("City") ) ) print("-- dataframe : address_df ----") address_df.show() |
Output
As shown below, the function withColumn() is created the new column “Address” in the dataframe address_df. Since we mentioned the concat operation inside the withColumn() function, it returned the concatenated values.
Example 3: Using concat_ws() function to concatenate with delimiter
In the previous example, we used lit() function to add the separator comma(,) between Street and City values. But these kind of separator can be added using concat_ws() function. Let’s modify the previously written code with concat_ws() function in PySpark.
1 2 3 4 5 6 7 8 |
# use concat_ws function to add the separator address_df = df.withColumn("Address", concat_ws(',',col("Street"), col("City") ) ) address_df.show() |
Output
As shown below, the separator comma (,) is added between Street and City values and new column is created as “Address” in the dataframe “address_df“.
Please note that N number of columns can be specified in the concat_ws function. The function adds given separator between each and every columns.
Example 4: Using concat() on the temporary view of Spark dataframe
The concatenate operation can be done as regular SQL statement in Spark. For that, we need to create the temporary view from the Spark dataframe.
- First use createOrReplaceTempView() for creating the temporary view with the name as “CustomerDetails“.
- Then write the select query with concat function on the view name.
1 2 3 4 5 6 7 |
# create temporary view from dataframe df df.createOrReplaceTempView("CustomerDetails") # select query with concat operation on view "CustomerDetails" address_df = spark.sql("select Customer_Id, Street, City ,concat(Street,',',City) as Address from CustomerDetails") address_df.show() |
Output
Since we used spark.sql() function to write the sql, it returned the expected values from the temporary view.
Complete Example
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
from pyspark.sql import SparkSession from pyspark.sql.functions import * data = [(124,'Garfield St','Hartford'), (192,'Louise St', 'Miami'), (489,'Boye St','Dallas')] columns = ['Customer_Id','Street','City'] #Create PySpark SparkSession spark = SparkSession.builder.master('local')\ .appName('concat_and_concat_ws_function_examples')\ .getOrCreate() # Example 1: concat() function # create dataframe df = spark.createDataFrame(data, columns) df.show() # concatenate columns in spark dataframe address_df = df.select( concat(col("Street"),col("City")) .alias("Address") ) address_df.show() # concatenate columns with alias name in spark dataframe address_df = df.select(col('Customer_Id'),col('Street'),col('City'), concat(col("Street"),lit(','),col("City")) .alias("Address") ) address_df.show() # Example 2: concat() and withColumn() print("-- dataframe : df ----") df.show() # withColumn() to create the new column "Address" address_df = df.withColumn("Address", concat(col("Street"), lit(','), col("City") ) ) print("-- dataframe : address_df ----") address_df.show() # Example 3: concat_ws() function # use concat_ws function to add the separator address_df = df.withColumn("Address", concat_ws(',',col("Street"), col("City") ) ) address_df.show() # Example 4: concat() function on the view of Spark dataframe # create temporary view from dataframe df df.createOrReplaceTempView("CustomerDetails") # select query with concat operation on view "CustomerDetails" address_df = spark.sql("select Customer_Id, Street, City ,concat(Street,',',City) as Address from CustomerDetails") address_df.show() |
Recommended Articles
References