Skip to content

You've recently started a new position as a Data Engineer at an energy company. Previously, analysts on other teams had to manually retrieve and clean data every quarter to understand changes in the sales and capability of different energy types. This process normally took days and was something that most analytsts dreaded. Your job is to automate this process by building a data pipeline. You'll write this data pipeline to pull data each month, helping to provide more rapid insights and free up time for your data consumers.

You will achieve this using the pandas library and its powerful parsing features. You'll be working with two raw files; electricity_sales.csv and electricity_capability_nested.json.

Below, you'll find a data dictionary for the electricity_sales.csv dataset, which you'll be transforming in just a bit. Good luck!

FieldData Type
periodstr
stateidstr
stateDescriptionstr
sectoridstr
sectorNamestr
pricefloat
price-unitsstr
import pandas as pd
import json
import logging

logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")

def extract_tabular_data(file_path:str):
    """Extract data from a tabular file_format, with pandas."""
    try:
        logging.info(f"Extrayendo archivo tabular {file_path}")
        if file_path.endswith(".csv"):
            df=pd.read_csv(file_path)
        elif file_path.endswith(".parquet"):
            df=pd.read_parquet(file_path)
        else:
            raise Exception("Please try with.csv or parquet")
        logging.info(f"Extracción completada filas:{len(df)}")
        return df
    except Exception as e:
        logging.error(f"Error en extracción tabular {e}")
        raise

def extract_json_data(file_path):
    """Extract and flatten data from a JSON file."""
    try:
        logging.info(f"Extrayendo archivo JSON {file_path}")
        with open(file_path,'r') as json_file:
            raw_data=json.load(json_file)
        df=pd.json_normalize(raw_data)
        logging.info(f"JSON cargado correctamente filas:{len(df)}")
        return df
    except Exception as e:
        logging.error(f"Error en extracción JSON {e}")
        raise

def transform_electricity_sales_data(raw_data:pd.DataFrame):
    """
    Transform electricity sales to find the total amount of electricity sold
    in the residential and transportation sectors.
    To transform the electricity sales data, you'll need to do the following:
    - Drop any records with NA values in the `price` column. Do this inplace.
    - Only keep records with a `sectorName` of "residential" or "transportation".
    - Create a `month` column using the first 4 characters of the values in `period`.
    - Create a `year` column using the last 2 characters of the values in `period`.
    - Return the transformed `DataFrame`, keeping only the columns `year`, `month`, `stateid`, `price` and `price-units`.
    """
    try:
        logging.info("Transformando electricity_sales_data")
        raw_data.dropna(subset=['price'],inplace=True)
        cleaned_df=raw_data.loc[raw_data["sectorName"].isin(["residential","transportation"]),:]
        cleaned_df["month"]=cleaned_df["period"].str[5:]
        cleaned_df["year"]=cleaned_df["period"].str[0:4]
        cleaned_df=cleaned_df.loc[:,["year","month","stateid","price","price-units"]]
        logging.info("Transformación completada")
        return cleaned_df
    except Exception as e:
        logging.error(f"Error en transformación {e}")
        raise

def load(dataframe:pd.DataFrame,file_path:str):
    try:
        logging.info(f"Cargando archivo {file_path}")
        if file_path.endswith(".csv"):
            dataframe.to_csv(file_path,index=False)
        elif file_path.endswith(".parquet"):
            dataframe.to_parquet(file_path)
        else:
            raise Exception(f"Warning: {file_path} is not a valid file type. Please try again!")
        logging.info("Archivo guardado correctamente")
    except Exception as e:
        logging.error(f"Error al guardar {file_path} {e}")
        raise
    
def load(dataframe:pd.DataFrame,file_path:str):
    try:
        logging.info(f"Cargando archivo {file_path}")
        if file_path.endswith(".csv"):
            dataframe.to_csv(file_path,index=False)
        elif file_path.endswith(".parquet"):
            dataframe.to_parquet(file_path)
        else:
            raise Exception(f"Warning: {file_path} is not a valid file type. Please try again!")
        logging.info("Archivo guardado correctamente")
    except Exception as e:
        logging.error(f"Error al guardar {file_path} {e}")
        raise

# Ready for the moment of truth? It's time to test the functions that you wrote!
def validate(path):
    try:
        if not os.path.exists(path):
            raise FileNotFoundError(f"Archivo no encontrado {path}")
        logging.info(f"Validación exitosa {path}")
    except Exception as e:
        logging.error(e)
        raise

logging.info("Ejecución de pipeline")

raw_electricity_capability_df=extract_json_data("electricity_capability_nested.json")
raw_electricity_sales_df=extract_tabular_data("electricity_sales.csv")
cleaned_electricity_sales_df=transform_electricity_sales_data(raw_electricity_sales_df)

load(raw_electricity_capability_df,"loaded__electricity_capability.parquet")
load(cleaned_electricity_sales_df,"loaded__electricity_sales.csv")

validate("loaded__electricity_capability.parquet")
validate("loaded__electricity_sales.csv")

logging.info("Ejecución finalizada")