Skip to content
My Python workspace
# Challange 5
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import col, avg
spark = SparkSession.builder.getOrCreate()
# Given the following schema for a dataframe `df`:
# Instrument
# |-- isin: string (nullable = true)
# |-- instrument_rating: struct (nullable = true)
# | |-- directionOfChange: string (nullable = true)
# | |-- effectiveDate: date (nullable = true)
# | |-- provider: string (nullable = true)
# | |-- rating: string (nullable = true)
# |-- organization_type: struct (nullable = true)
# | |-- type: string (nullable = true)
# | |-- code: string (nullable = true)
# |-- convertibleOrExchangeable: array (nullable = true)
# | |-- coCoTriggerDescription: struct (containsNull = true)
# | | |-- convertibleTriggerLevel: string (nullable = true)
# | | |-- endDate: date (nullable = true)
# | | |-- parValue: integer (nullable = true)
# | | |-- result: array (nullable = true)
# | | | |-- element: string (containsNull = true)
# | | |-- price: double (nullable = true)
# | | |-- ratio: integer (nullable = true)
# | | |-- startDate: date (nullable = true)
# | | |-- type: integer (nullable = true)
# Define schema
schema = StructType([
StructField("isin", StringType(), True),
StructField("instrument_rating", StructType([
StructField("directionOfChange", StringType(), True),
StructField("effectiveDate", DateType(), True),
StructField("provider", StringType(), True),
StructField("rating", StringType(), True)
]), True),
StructField("organization_type", StructType([
StructField("type", StringType(), True),
StructField("code", StringType(), True)
]), True),
StructField("convertibleOrExchangeable", ArrayType(
StructType([
StructField("coCoTriggerDescription", StructType([
StructField("convertibleTriggerLevel", StringType(), True),
StructField("endDate", DateType(), True),
StructField("parValue", IntegerType(), True),
StructField("result", ArrayType(StringType(), True), True),
StructField("price", DoubleType(), True),
StructField("ratio", IntegerType(), True),
StructField("startDate", DateType(), True),
StructField("type", IntegerType(), True)
]), True)
]), True
), True)
])
# Create an empty DataFrame
df = spark.createDataFrame(spark.sparkContext.emptyRDD(), schema)
# In PySpark or Scala Spark or Spark SQL,
# Write a query to display all rows where the `effectiveDate` is greater than 2022-01-1.
# filtered_df = df.filter(df.instrument_rating.effectiveDate > "2022-01-01")
# filtered_df.show()
# Create a flattened delta table, for each row, from the array `coCoTriggerDescription` struct within
# `convertibleOrExchangeable`. When creating the table, consider performance optimizations, this table
# will be about 100,000 rows. Think about the core data you have and if you want to add any other columns
# (you may or may not, depends on your approach)
# flattened_df = df.select(
# "isin",
# "instrument_rating.directionOfChange",
# "instrument_rating.effectiveDate",
# "instrument_rating.provider",
# "instrument_rating.rating",
# "organization_type.type",
# "organization_type.code",
# "convertibleOrExchangeable.coCoTriggerDescription.convertibleTriggerLevel",
# "convertibleOrExchangeable.coCoTriggerDescription.endDate",
# "convertibleOrExchangeable.coCoTriggerDescription.parValue",
# col("convertibleOrExchangeable.coCoTriggerDescription.result").alias("resultArray"),
# "convertibleOrExchangeable.coCoTriggerDescription.price",
# "convertibleOrExchangeable.coCoTriggerDescription.ratio",
# "convertibleOrExchangeable.coCoTriggerDescription.startDate",
# "convertibleOrExchangeable.coCoTriggerDescription.type"
# )
# flattened_df.show()
# Write a query to display all of the unique values for `code` in the `organization_type` column.
# df.select("organization_type.code").distinct().show()
# Add a new column "organization_type" to the Instrument table, that extracts the values from "type" within
# the organization_type struct
# newCol_df = df.withColumn("organization_type", col("organization_type.type"))
#
# newCol_df.show()
# Write a mini-pipeline that will extract from instrument_ratings ratings by provider (S&P, Fitch, Moody's) assign
# each rating to the weight mapping below and average the 3 ratings per ISIN (International Security Identification
# Number)
ratings_weights = [("AAA", "Aaa", 200),
("AA+", "Aa1", 190),
("AA", "Aa2", 180),
("AA-", "Aa3", 170),
("A+", "A1", 160),
("A", "A2", 150),
("A-", "A3", 140),
("BBB+", "Baa1", 130),
("BBB", "Baa2", 120),
("BBB-", "Baa3", 110),
("BB+", "Ba1", 100),
("BB", "Ba2", 90),
("BB-", "Ba3", 80),
("B+", "B1", 70),
("B", "B2", 60),
("B-", "B3", 50),
("CCC+", "Caa1", 40),
("CCC", "Caa2", 40),
("CCC-", "Caa3", 40),
("CC", "Ca", 30),
("C", "C", 20),
("D", "D", 10)]
ratingsWeights_df = spark.createDataFrame(ratings_weights, ["S&P", "Moodys/Fitch", "Weight"])
ratingsWeights_df.show()