Skip to content

# Challange 5
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import col, avg


spark = SparkSession.builder.getOrCreate()

# Given the following schema for a dataframe `df`:

# Instrument
#  |-- isin: string (nullable = true)
#  |-- instrument_rating: struct (nullable = true)
#  |    |-- directionOfChange: string (nullable = true)
#  |    |-- effectiveDate: date (nullable = true)
#  |    |-- provider: string (nullable = true)
#  |    |-- rating: string (nullable = true)
#  |-- organization_type: struct (nullable = true)
#  |    |-- type: string (nullable = true)
#  |    |-- code: string (nullable = true)
#  |-- convertibleOrExchangeable: array (nullable = true)
#  |    |-- coCoTriggerDescription: struct (containsNull = true)
#  |    |    |-- convertibleTriggerLevel: string (nullable = true)
#  |    |    |-- endDate: date (nullable = true)
#  |    |    |-- parValue: integer (nullable = true)
#  |    |    |-- result: array (nullable = true)
#  |    |    |    |-- element: string (containsNull = true)
#  |    |    |-- price: double (nullable = true)
#  |    |    |-- ratio: integer (nullable = true)
#  |    |    |-- startDate: date (nullable = true)
#  |    |    |-- type: integer (nullable = true)

# Define schema
schema = StructType([
    StructField("isin", StringType(), True),
    StructField("instrument_rating", StructType([
        StructField("directionOfChange", StringType(), True),
        StructField("effectiveDate", DateType(), True),
        StructField("provider", StringType(), True),
        StructField("rating", StringType(), True)
    ]), True),
    StructField("organization_type", StructType([
        StructField("type", StringType(), True),
        StructField("code", StringType(), True)
    ]), True),
    StructField("convertibleOrExchangeable", ArrayType(
        StructType([
            StructField("coCoTriggerDescription", StructType([
                StructField("convertibleTriggerLevel", StringType(), True),
                StructField("endDate", DateType(), True),
                StructField("parValue", IntegerType(), True),
                StructField("result", ArrayType(StringType(), True), True),
                StructField("price", DoubleType(), True),
                StructField("ratio", IntegerType(), True),
                StructField("startDate", DateType(), True),
                StructField("type", IntegerType(), True)
            ]), True)
        ]), True
    ), True)
])

# Create an empty DataFrame
df = spark.createDataFrame(spark.sparkContext.emptyRDD(), schema)

# In PySpark or Scala Spark or Spark SQL,

# Write a query to display all rows where the `effectiveDate` is greater than 2022-01-1.
# filtered_df = df.filter(df.instrument_rating.effectiveDate > "2022-01-01")

# filtered_df.show()

# Create a flattened delta table, for each row, from the array `coCoTriggerDescription` struct within
# `convertibleOrExchangeable`. When creating the table, consider performance optimizations, this table
# will be about 100,000 rows. Think about the core data you have and if you want to add any other columns
# (you may or may not, depends on your approach)


# flattened_df = df.select(
#     "isin",
#     "instrument_rating.directionOfChange",
#     "instrument_rating.effectiveDate",
#     "instrument_rating.provider",
#     "instrument_rating.rating",
#     "organization_type.type",
#     "organization_type.code",
#     "convertibleOrExchangeable.coCoTriggerDescription.convertibleTriggerLevel",
#     "convertibleOrExchangeable.coCoTriggerDescription.endDate",
#     "convertibleOrExchangeable.coCoTriggerDescription.parValue",
#     col("convertibleOrExchangeable.coCoTriggerDescription.result").alias("resultArray"),
#     "convertibleOrExchangeable.coCoTriggerDescription.price",
#     "convertibleOrExchangeable.coCoTriggerDescription.ratio",
#     "convertibleOrExchangeable.coCoTriggerDescription.startDate",
#     "convertibleOrExchangeable.coCoTriggerDescription.type"
# )

# flattened_df.show()

# Write a query to display all of the unique values for `code` in the `organization_type` column.
# df.select("organization_type.code").distinct().show()


# Add a new column "organization_type" to the Instrument table, that extracts the values from "type" within
# the organization_type struct

# newCol_df = df.withColumn("organization_type", col("organization_type.type"))
#
# newCol_df.show()

# Write a mini-pipeline that will extract from instrument_ratings ratings by provider (S&P, Fitch, Moody's) assign
# each rating to the weight mapping below and average the 3 ratings per ISIN (International Security Identification
# Number)

ratings_weights = [("AAA", "Aaa", 200),
                   ("AA+", "Aa1", 190),
                   ("AA", "Aa2", 180),
                   ("AA-", "Aa3", 170),
                   ("A+", "A1", 160),
                   ("A", "A2", 150),
                   ("A-", "A3", 140),
                   ("BBB+", "Baa1", 130),
                   ("BBB", "Baa2", 120),
                   ("BBB-", "Baa3", 110),
                   ("BB+", "Ba1", 100),
                   ("BB", "Ba2", 90),
                   ("BB-", "Ba3", 80),
                   ("B+", "B1", 70),
                   ("B", "B2", 60),
                   ("B-", "B3", 50),
                   ("CCC+", "Caa1", 40),
                   ("CCC", "Caa2", 40),
                   ("CCC-", "Caa3", 40),
                   ("CC", "Ca", 30),
                   ("C", "C", 20),
                   ("D", "D", 10)]

ratingsWeights_df = spark.createDataFrame(ratings_weights, ["S&P", "Moodys/Fitch", "Weight"])

ratingsWeights_df.show()