Skip to content

Scheduling data

  • applys to any task listed in data processing
  • holds each piece and organize how they work together
  • runs tasks in a specific order and resolves all dependencies
3 ways of scheduling data: manual, time and sensor scheduling
  1. manually, e.g., update the employee table
  2. automatically run at a specific time, e.g., at 6 AM
  3. automatically run if a specific condition is met, e.g., if a new employee was added
Batches and streams
  • Batches (often cheaper): to group records at intervals
  • Streams: to send individual records right away
Scheduling tools
  • Apache Airflow
  • Luigi

Parallel computing

  • basis of modern data processing tools
  • necessary: mainly because of memory and also for processing power
  • How it works:
    • Split tasks up into several smaller subtasks
    • Distribute these subtasks over several computers
benefits and risks (employees <=> processing units)
  • advantages: extra processing power, reduce memory footprint
  • disadvantages: moving data incurs a cost, communication time

Cloud computing

for data processing
  • Servers on premises: 1. bought, 2. need space, 3. electrical and maintenance cost, 4. enough power for peak moments, 5. processing power unused at quieter times
  • Servers on the cloud: 1. rented, 2. don't need space, 3. use just the resources we need, 4. when we need them, 5. the closer to the user the better
for data storage
  • DB reliability: data replication
  • risk wth sensitive data
File storageComputationDBs
awsS3EC2RDS
awsRedshift (data warehouse)
microsoft azureBlob StorageVirtual MachinesSQL Database
google cloudCloud StorageCompute EngineCloud SQL
google cloudDatastore(NoSQL)
IBM Cloud File StorageSnowflake Data Warehouse
Multicloud
  • Pros: 1. reducing reliance on a single vendor, 2. cost-efficiencies, 3. local laws requiring certain data to be physically present within the country, 4. mitigating against disasters
  • Cons: 1. cloud providers try to lock in consumers, 2. incompatibility, 3. security and governance
# Case 1
class AstroBody:
    description = 'Natural entity in the observable universe.'
    
class Star(AstroBody):
    pass

sun = Star()
sun.description

# Case 2
for i in "programming":
    if i == "a":
        break
    print(i)
    
# Case 3: **kwargs works just like *args, but it accepts keyword arguments
def make_dict(**kwargs):
    return kwargs

make_dict(a = 1, b = 2)

# Case 4.1: a list comprehension
[i for i in range(5) if i > 2]
# Case 4.2
[i * 3 for i in range(5)]

# Case 5: 
class Planet:
    def __init__(self, name):
        self.name = name
        
v = Planet('venus')
v.name

# Case 6: *args to pass a varying number of positional arguments
# Note: 'args' just a name
def add_many(*args):
    s = 0
    for n in args:
        s += n
    print(s)
    
add_many(100, 50, 3)

# Case 7: values in 'set' cannot be duplicated
ints = set([1,1,2,3,3,3,4]) 
print(len(ints)) # 4

# Case 7: equivalent code
## option 1
vals = [25, 30, 33, 35, 40, 180]
new_vals = []
for val in vals:
    new_vals.append(val + 32)
print(new_vals)

## option 2
vals = [25, 30, 33, 35, 40, 180]
new_vals = [val + 32 for val in vals]
print(new_vals)

# Case 8: 
class Color:
    def __init__(self, rgb_value):
        self.rgb_value = rgb_value
c = Color('#00ff66')        
c.rgb_value

# Case 9: open file and close automatically
with open('hello.txt', 'w') as file:
    file.write("hello!")
print(file.closed)   # True

# Case 10: 
boardgames = [
  'Pandemic Legacy: Season 1', 
  'Terraforming Mars', 
  'Brass: Birmingham'
]
boardgames.insert(0, 'Gloomhaven')
print(boardgames) # ['Gloomhaven', 'Pandemic Legacy: Season 1', 'Terraforming Mars', 'Brass: Birmingham']

# Case 11:
f = lambda x, y: x * y
f(3, 3)  # 9

# Case 12:
def to_int(x):
    try:
        return int(x)
    except ValueError:
        print('')
        
to_int()

# Case 13:
def point(x, y):
    return x, y
point(0, 1) # (0, 1)

Data Engineering

  1. what
  2. tools
  3. ETL (extract-tranform-load)
  • gather data from different sources
  • optimized DB for analyses
  • removed corrupt data

Job Desc.

  • develop, construct, test, and maintain architectures such as DBs, large-scale processing systems (1. processing large amount of data, 2. use of clusters of machines)
D.ED.S
develop scalable data architecturemining data for patterns
streamline data acquisitionstatistical modeling
set up processes to bring together datapredictive models using ML
clean corrupt datamonitor biz processes
well versed in cloud technologyclean outliers in data

Example of D.E problem: as the company is growing, there are unmistakably some technical growing pains. As the DE, you observe some probelms and have to decide where you're best suited to be of help.

Anser: DS. is querying the online store DB directly and slowing down the functioning of the app. The DE. should make sure there's a separate DB for analytics

Best practices

Docstrings

Google style - desc, args, return, errors, notes
def function(arg_1, arg_2=42) """Description of what the function does. Args: arg_1 (str): Description of arg_1 arg_2 (int, optimal): Write optional when an arg. has a default value Returns: bool: options desc. Raises: ValueError: include any error types that the function intentionally raises. Notes: See ... for more info. """
Numpydoc style
def function(arg_1, arg_2=42) """Description of what the function does. Parameters ------------ arg_1 : string Description of arg_1 arg_2 : int, optimal Write optional when an arg. has a default value Default=42 Returns (or Yields if this function is a generator) ------------ bool: options desc. """
Retrieving docstrings
# option 1 print(the_answer.__doc__) # option 2 import inspect print(inspect.getdoc(the_answer))

DRY and "Do One Thing"

DRY: don't repeat yourself

  • use functions to avoid repetition

Do One Thing

  • every function should have a single responsibility
  • advantages: 1. more flexible, 2. more easily understood, 3. simpler to test, 4. simpler to debug, 5. easier to change

Code smells: indications that you may need to refactor

  • Refactoring: the process of improving code by changing it a little bit at a time (Martin Fowler's book)
# this function violates another Software Engineering principle: Do One Thing def load_and_plot(path): # load a dataset ... # plot ... return data

Pass by assignment

def foo(x): x[0] = 99 my_list = [1, 2, 3] foo(my_list) print(my_list) # = 99 because my_list and x point to the same memory area def bar(x): x = x + 90 my_var = 3 bar(my_var) print(my_var) # = 3 because in Python, integers are immutable (can't be changed) # x = 93, my_var = 3 because my_var isn't touched in the memory # Case - mutable default arguments are dangerous! # Bad!!! def foo(var=[]): var.append(1) return var foo() # 1 foo() # 1 1 # Good!!! def foo(var=None): var.append(1) return var foo() # 1 foo() # 1
ImmutableMutable
intlist
floatdict
boolset
stringbytearray
bytesobjects
tuplefunctions
frozensetelse!
None

A context manager

Context managerCaterer for a party
set up a contextset up the tables with food and drink
run your codelet you and your friends have a party
remove the contextcleaned up and removed the tables
# The "open()" function is a context manager with open('my_file.txt') as my_file: text = my_file.read() length = len(text) # 'print' is outside of the context, so by the time it runs the file is closed print(''.format(length)) # Step 1: "with open()": open a file to read/write - set up a context by opening a file # Step 2: lets you run any code you want on that file # Step 3: remove the context by closing the file

Template:

# 'as <variable-name>' isn't necessary if the context mgr doesn't return a value with <context-manager>(<args>) as <variable-name>: # run your code here # this code is running "inside the context" # This code runs after the context is removed

Writing context managers

# Template @contextlib.contextmanager # add decorator def my_context(): # add set up code you need (optional) yield # keyword to return a value # add teardown code you need (optional) # example - 'yield' @contextlib.contextmanager def my_context(): print('hello') yield 42 print('goodbye') with my_context() as foo: print('foo is {}'.format(foo)) # hello # foo is 42 # goodbye # example - setup and teardown @contextlib.contextmanager def database(url): db = postgres.connect(url) # set up yield db db.disconnect() # tear down url = 'http://datacamp.com/data' with database(url) as my_db: course_list = my_db.execute( 'SELECT * FROM courses' ) # yielding a value or None @contextlib.contextmanager def in_dir(path): # save current working dir old_dir = os.getcwd() # switch to new workding dir os.chdir(path) yield # back to prev. workding dir os.chdir(old_dir) with in_dir('/data/proj'): proj_files = os.listdir()
Nested contexts
def copy(src, dst): """desc Args: src (str): desc dst (str): desc """ # open and read with open(scr) as f_src: contents = f_src.read() # open and write out with open(dst, 'w') as f_dst: f_dst.write(contents) # Problem: this approach works fine until you try to copy a file that is too large to fit in memory # Ideal solution: open both files at once and copy over one line at a time def copy(src, dst): """desc Args: src (str): desc dst (str): desc """ # open both files with open(src) as f_src: with open(dst, 'w') as f_dst: # read line-by-line # write each line out to the dest. as we go # read and write each line, one at a time for line in f_src: d_dst.write(line)
Handling errors
try: # code except: # do s.t about the error finally: # this code runs no matter what def get_printer(ip): p = connect_to_printer(ip) try: yield finally: p.disconnect() print('disconnected from printer') doc = {'text':'This is my text.'} with get_printer('10.0.34.111') as printer: printer.print_page(doc['txt']) # cause errors as 'txt' not exist
Conext manager patterns
  • open, lock, change, enter, start, setup, connect
  • close, release, reset, exit, stop, teardown, disconnect
Conext manager use cases:
  • Bad: a function that prints all of the prime numbers (2-n)
  • Good: 1. a function that starts a timer that keep track of how long some block of code takes to run, or 2. connect to a smart ... so that it can be programmed remotely, or 3. prevents multiple users from updating an online spreadsheet at the same time by locking access to the spreadsheet before every operation.

Decorators (a way of modifying the behavior of functions)

Functions are objects
# Case 1: python objects def x(): pass x = [1, 2, 3] # or {'foo':42} or panda.DataFrame() or '...' or 71.2 import x # Case 2: function as variables def my_function(): print('Hello') x = my_function type(x) # <type 'function'> x() # Hello Printface = print Printface('Python is awesome') # Python is awesome # Case 3: Lists and dictionaries of functions list_of_functions = [my_function, open, print] list_of_functions[2]('heo con') # heo con dict_of_functions = { 'f1': my_function, 'f2': open, 'f3': print } dict_of_functions['f3']('heo con') # heo con # Case 4: Referencing a function def my_function(): return 42 x = my_function # assign to x my_function() # 42 my_function # reference itself # Case 5: Functions as arguments def has_docstring(func): """desc Args: func (callable): a function Returns: bool """ return func.__doc__ is not None def no(): # has no doc return 42 def yes(): """Return the value 42 """ # has doc return 42 has_docstring(no) # False has_docstring(yes) # True # Case 6: define a function inside another function def foo(): x = [3, 6, 9] # inner/helper/child functions def bar(y): print(y) for value in x: bar(x) # or <=> if x > 4 and x < 10 and y > 4 and y < 10: print (x*y) def in_range(v): return v > 4 and v < 10 if in_range(x) and in_range(y): print(x * y) # Case 7: Functions as return values def get_function(): def print_me(s): print(s) return print_me new_func = get_function() new_func('This is a sentence')
Scope

Scope determines which variables can be accessed at different points in your code

Nonlocal variables are defined in the parent function that are used by the child function

Built-in <- Global <- Non-local <- Local

# Case 1 - local x = 7 def foo(): x = 42 print(x) foo() # 42 print(x) # 7 # Case 2 - glocal x = 7 def foo(): global x x = 42 print(x) foo() # 42 print(x) # 42 # Case 3 - non-local def foo(): x = 10 def bar(): x = 200 print(x) bar() print(x) foo() # 200 10 def foo(): x = 10 def bar(): nonlocal x x = 200 print(x) bar() print(x) foo() # 200 200

Closures

A closure is a tuple of variables that are no longer in scope, but that a function needs in order to run

def foo(): a = 5 def bar(): print(a) return bar # Case 1 - closure func = foo() func() # 5 type(func.__closure__) # <class 'tuple'> len(func.__closure__) # 1 func.__closure__[0].cell_contents # 5 x = 25 def foo(value): def bar(): print(value) return bar # Case 2 - clousures and deletion my_func = foo(x) my_func() # 25 del(x) my_func() # 25 as still be cached in the function len(my_func.__closure__) # 1 my_func.__closure__[0].cell_contents # 25 # Case 3 - closures and overwriting x = foo(x) x() # 25 len(x.__closure__) # 1 x.__closure__[0].cell_contents # 25 # Closure: nonlocal vars attached to a returned function def parent(arg_1, arg_2): value = 22 my_dict = {'chocolate':'ymmy'} def child(): print(2*value) print(my_dict['chocolate']) print(arg_1+arg_2) return child new_function = parent(3, 4) print([cell.cell_contents for cell in new_function.__closure__]) # [3 4 22 {'chocolate':'ymmy'}]

Decorators

Decorators use:

  • functions as objects
  • nested functions
  • nonlocal scope
  • closures

A decorator (@) is a wrapper placing around a function that changes that function's behavior without changing the code of the function itself

@double_args # 2*x def multiply(a, b): return a * b multiply(1, 5) # 20 # equivalent with def multiply(a, b): return a * b def double_args(func): def wrapper(a, b): return func(a*2, b*2) return wrapper new_multiply = double_args(multiply) new_multiply(1, 5) # 20
Real-world examples
# example 1 - helpful to find the slow parts of your code import time def timer(func): """A decorator that prints how long a function took to run Args: func (callable): the function being decorated Returns: callable: the decorated function """ # define the wrapper function to return def wrapper(*args, **kwargs): t_start = time.time() result = func(*args, **kwargs) t_total = time.time() - t_start print('{} took {}s'.format(func.__name__, t_total)) return result return wrapper @timer def sleep_n_seconds(n): time.sleep(n) sleep_n_seconds(5) # sleep_n_seconds took 5.000509s # example 2 def memoize(func): """Store the results of the decorated function for fast lookup """ # store results in a dict that maps arguments to results cache = {} def wrapper(*args, **kwargs): if(args, kwargs) not in cache: cache[(args, kwargs)] = func(*args, **kwargs) return cache[(args, kwargs)] return wrapper @memoize def slow_function(a, b): print('Sleeping...') time.sleep(5) return a + b slow_function(3, 4) # Sleeping... 7 slow_function(3, 4) # 7 # not Sleeping... anymore because the answer in the cache, # the decorated function doesn't even have to call the original slow_function
When to use decorators:
  • when I want to add some common bit of code to multiple functions (not violate te principle of Don't Repeat Yourself)
def counter(func): def wrapper(*args, **kwargs): wrapper.count += 1 return func wrapper.count = 0 return wrapper @counter def foo(): print('calling foo()') foo() print('foo was called {} times'.format(foo.count))
Decorators and metadata

Problem: decorators obscure the decorated function's metadata

# case - normal def sleep_n_seconds(n=10): """desc. is here Args: n (int): ... """ time.sleep(n) print(sleep_n_seconds.__doc__) # desc. is here ... # case - weird @timer def sleep_n_seconds(n=10): """desc. is here... ... """ time.sleep(n) print(sleep_n_seconds.__doc__) # empty!!!! # to understand, we have to examine the timer decorator # Case - the nested function wrapper() didn't have a docstring def timer(func): """A decorator ...""" def wrapper(*args, **kwargs): .... result = func(*args, **kwargs) .... return result return wrapper # Solution - @wraps() to define a decorator from functools import wraps def timer(func): """A decorator ...""" @wraps(func) def wrapper(*args, **kwargs): ... result = func(*args, **kwargs) return result return wrapper

DE Intro

Tools

Processing

  • clean data
  • aggregate data
  • join data
###Processing - an example df = spark.read.parquet("users.parquet") outliers = df.filter(df["large"] > 100) print(outliers.count())

Scheduling

  • plan jobs with specific intervals
  • resolve dependency requirements of jobs

Tools

  • DBs: MySQL, PostgreSQL
  • Processing: Spark, Hive
  • Scheduling: Airflow, oozie
D.E Toolbox

SQL: Star schema (consists of one or more fact table referencing any number of dimension tables)

  • Facts: things that happened (e.g., Product Orders)
  • Dimensions: info. on the world (e.g., Customer Info.)
data = pd.read_sql(""" SELECT * FROM "Customer" INNER JOIN "Order" ON "Order"."customer_id" = "Customer"."id" """, db_engine) # show the id column of data print(data.id)
Idea behind parallel computing

Basis of modern data processing tools

  • memory
  • processing power

Idea

  • split task into subtasks
  • distribute subtasks over several computers
  • work together to finish task

Risks

  • parallel slowdown: overhead due to communication. So the task nees to be so large that it needs several processing units
####Example 1 ### multiprocessing.Pool (API to distribute work over several cores on the same machine) from multiprocessing import Pool # a tuple: the year of the group and the group itself def take_mean_age(year_and_group): year, group = year_and_group # one observation and one column: the mean age of the group return pd.DataFrame({"Age":group["Age"].mean()}, index=[year]) # take this function and map it over the groups generated by `groupby()` with Pool(4) as p: # the mapping runs in 4 separate processes and uses 4 cores results = p.map(take_mean_age, athlete_events.groupby("Year")) result_df = pd.concat(results) ####Example 2 ### dask framework offers a DataFrame object, which performs a groupby and apply ### dask: uses lazy evaluation ### using multiprocessing out of the box import dask.dataframe as dd # Partition dataframe into 4 athlete_events_dask = dd.from_pandas(athlete_events, npartitions = 4) # Run parallel computations on each partition result_df = athlete_events_dask.groupby('Year').Age.mean().compute()
Parallel computation frameworks
  1. Apache hadoop
    • HDFS: a distributed file system
      • Now, replaced by cloud-managed storage systems like Amazon S3
    • MapReduce: tasks into substasks -> processing units (computers in the cluster)
      • Flaws: hard to write these MapReduce jobs
        • Solution: Hive - a layer on top of the Hadoop ecosystem that makes data from sources queryable in a structured way using Hive SQL
###Hive - an example ###The query is transformed into a job that can operate on a cluster on computers SELECT year, AVG(age) FROM views.athlete_events GROUP BY year
  1. Apache spark: distributes data processing tasks between clusters of computers
    • while MapReduce-based systems tend to need expensive disk writes between jobs, Spark keeps as much processing as possible in memory
    • Note: the disk writes of MapReduce were limiting in interactive EDA (exploratory data analysis), where each step build on top of a previous step
    • Resilient distributed datasets (RDD):
      • a data structure that maintains data which is distributed between multiple nodes
      • unlike DataFrames, RDDs don't have named columns
      • from a conceptual perspective, RDDs as lists of tuples
      • Transformations (result in transformed RDDs): .map() or .filter()
      • Actions (result in a single result): .count() or .first()
    • PySpark
      • hosts a DataFrame abstraction <=> do operations very similar to pandas DataFrames
# Load the dataset into athlete_events_spark ( athlete_events_spark.groupBy('Year').mean('Age').show() athlete_events_spark.printSchema() type(athlete_events_spark) ) # run PySpark files # for the sake of this exercise, working with a local Spark intance running on 4 threads from pyspark.sql import SparkSession if __name__ == "__main__": spark = SparkSession.builder.getOrCreate() athlete_events_spark = (spark .read .csv("/home/repl/datasets/athlete_events.csv", header=True, inferSchema=True, escape='"')) athlete_events_spark = (athlete_events_spark .withColumn("Height",athlete_events_spark.Height.cast("integer")) ) print(athlete_events_spark .groupBy('Year') .mean('Height') .orderBy('Year') .show() ) # run the file (work with a local Spark instance running on 4 threads) spark-submit \ --master local[4] /home/repl/spark-script.py
Workflow scheduling frameworks

An example pipeline: CSV --extract-->Apache Spark--load-->SQL (return 'clean' to Spark)

Q: how to schedule?

  • manually
  • [linux] 'cron' scheduling tool
  • 2 options not work if (1) one job for the CSV file and (2) another job to pull in and clean the data from an API, and (3) a third job that joins the data from the CSV and the API together. [Dependencies] The 3rd job depends on the first 2 jobs to finish
    • apparently, needing a more holistic approach

DAGs (directed acrylic graph) visualize these dependencies

  • a set of nodes that are connected by directed edges
  • no cycles in the graph <=> no path following the directed edges sees a specific node more than once
  • Job A --> Job B ----> [Job C] and [Job D --> Job E]
  • the jobs represented by the DAG can then run in a daily schedule, for example

Tools for the job

  • Individual: Linux's cron
  • Company - a more full-fledged solution: Spotify's Luigi - allow for the definition of DAGs for complex pipelines
  • Apache Airflow - the de-facto workflow scheduling framework (using Python)

Airflow - an example DAG: start_cluster ----> ingest_customer_data/ingest_product_data ----> enrich_customer_data

  • 1st job: start a Spark cluster
  • 2nd job: pull in customer and product data
  • 3rd job: aggregate both tables
# Create the DAG object # N * * * * : every hour at minute N # dag = DAG(dag_id="car_factory_simulation", default_args={"owner":"airflow", "start_date":airflow.utils.dates.days_ago(2)}, schedule_interval="N * * * *") dag = DAG(dag_id = "example_dag",..., schedule_interval="0 * * * *") # Define operations # Operator: define each of the jobs start_cluster = StartClusterOperator(task_id="start_cluster", dag=dag) ingest_customer_data = SparkJobOperator(task_id="ingest_product_data", dag=dag) ingest_product_data = SparkJobOperator(task_id="ingest_product_data", dag=dag) enrich_customer_data = PythonOperator(task_id="ingest_product_data",...,dag=dag) # Set up dependency flow start_cluster.set_downstream(ingest_customer_data) ingest_customer_data.set_downstream(enrich_customer_data) ingest_product_data.set_downstream(enrich_customer_data)
ETL (Extract, Transform, Load)
Extract
  • various sources

  • persistent storage:

    • not suited for data processing
    • a file on Amazon S3 or a SQL database
  • extracting data from persistent storage into memory

  • extract from text files:

    • Unstructured: plain text
    • Flat files: row = record; column = attribute
    • JSON:
      • JavaScript Object Notation
      • Semi-structured
      • Atomic: number, string, boolean, null
      • Composite: array, object
import json result = json.loads('{ "key_1":"value_1", "key_2":"value_2" }') print(result["key_1"])

Data on the Web

  • request page
  • response with data

Data on the Web through APIs:

  • APIs mostly use a more structured form of data
  • send data in JSON format
import requests response = requests.get("https://hacker-news.firebaseio.com/v0/item/16222426.json") # the response parsedd as JSON print(response.json()) post_score = resp.json()["score"] print(post_score)

Data in databases

  • App. DBs

    • transactions
    • inserts or changes
    • OLTP (online transaction processing) - system is optimized for transactions
    • row-oriented
  • Analytical DBs

    • OLAP (online analytical processing)
    • column-oriented

Extraction from DBs

import sqlalchemy # postgresql://[user[:passwordd]`][host][:port] connection_uri = "postgresql://repl:password@localhost:5432/pagila" db_engine = sqlalchemy.create_engine(connection_uri) import pandas as pd pd.readd_sql("SELECT * FROM {}".format("customer"), db_engine)
Transform

Kind of transformations

  • Translation of code values (e.g., 'New York' -> 'NY')
  • Data validation (e.g., data input in 'created_at')
  • Splitting columns into multiple columns
  • Joining from multiple sources
customer_df #Pandas DataFrame with customer data # split email column into 2 columns on the '@' symbol split_email = customer_df.email.str.split("@", expand=True) # At this point, split_email will have 2 columns, # a first one with everything before @ # a second one with everything after @ # create 2 new columns customer_df = customer_df.assign( username = split_email[0], domain = split_email[1] )

Transforming in PySpark

# extract data into PySpark import pyspark.sql spark = pyspark.sql.SparkSession.builder.getOrCreate() spark.read.jdbc( "jdbc:postgresql://localhost:5432/pagila", "customer", properties={"user":"repl", "password":"password"} ) # join customer_df and ratings_df rating_per_customer = ratings_df.groupBy("customer_id").mean("rating") customer_df_with_ratings = customer_df.join( rating_per_customer, customer_df.customer_id==rating_per_customer.customer_id ) print(customer_df_with_ratings.show(5))
Loading

Analytics or applications DBs

Analytics

  • aggregate queries
  • OLAP: online analytical processing
  • Queries about subset of columns
  • parallelization

Applications

  • lots of transactions
  • OLTP: online transaction processing
  • Row-oriented
  • stored per record
  • added per transaction (e.g., adding customer is fast)

MPP DBs (massively parallel processing)

  • Nodes which are column-oriented DBs optimized for analytics, that run in a distributed fashion
    • queries are not executed on a single compute node, but ratherr split into subtasks and distributed among several nodes
  • Amazon Redshift, Azure SQL Data Warehouse, Google BigQuery
# Case 1 - load from file to columnar storage format # 1. load data into Amazon Redshift (by writing files to S3 - AWS's file storage service) # 2. send a copy query to Redshift # connect to Redshift using a PostgreSQL connection URI # Apache Parquet là một định dạng lưu trữ cột có sẵn cho bất kỳ dự án nào trong hệ sinh thái Hadoop # wrrite the Pandas df to parquet pdf.to_parquet("./s3://path/to/bucket/customer.barquet") # write the PySpark df to parquet sdf.write.parquet("./s3://path/to/bucket/customer.barquet") # copy the data frrom S3 into Redshift COPY customer FROM 's3://path/to/bucket/customer.barquet' FORMAT as parquet # Case 2 - load to PostgreSQL (pandas.to_sql()) # transformation on data recommendations = transform_find_recommendations(ratings_df) # load into PostgreSQL db recommendations.to_sql("recommendations", db_engine, schema="store", if_exists="replace")

A full extent of an ETL pipeline: we have extracted data from DBs, transformed the data to fit our needs, loaded them back into a DB, the data warehouse

###ETL function def extract_table_to_df(tablename, db_engine): return pd.read_sql("SELECT * FROM {}".format(tablename), db_engine) def split_columns_transform(df, column, pat, suffixes): # convert column into str and splits it on pat... def load_df_into_dwh(film_df, tablename, schema, db_engine): return pd.to_sql(tablename, db_engine, schema=schema, if_exists="replace") db_engines = {...} # needs to be configured def etl(): #extract film_df = extract_table_to_df("film", db_engines["store"]) #transform film_df = split_columns_transform(film_df, "rental_rate", ".", ["_ddollar", "_cents"]) #load load_df_into_dwh(film_df, "film", "store", db_engines["dwh"])

Apache Airflow

  • a Python-written workflow scheduler
  • represent Directed Acryclic Graphs (DAGs) in Python objects (to manage workflows)
  • Tasks defined in operators (e.g., BashOperator to run a bash script)
    • an operator represents a unit of work

Scheduling with DAGs in Airflow

from airflow.models import DAG dag = DAG(dag_id="sample", ..., # multiple ways of defining the interval # most common: a cron expression schedule_interval="0 0 * * *")

the DAG definition file

from airflow.models import DAG from airflow.operators.python_operator import PythonOperator dag = DAG(dag_id="etl_pipeline", schedule_interval="0 0 * * *") etl_task = PythonOperator(task_id="etl_task", python_callable=etl, dag=dag) etl_task.set_upstream(wait_for_this_task) #saved as etl_dag.py in ~/airflow/dags/
Use case: from ratings to recommendations

1. Course ratings

  • get rating data
  • clean and calculate top-recommended courses
  • recalculate daily
### an ETL process ### data_application -> cleaning -> calculate recommendations -> datawarehouse ### DB tables: ### course(course_id, title, description, programming_lan) ### rating(user_id, course_id, rating) ## Step 1: connect # connection URI connection_uri = "postgresql://usrr:pwd@localhost:5432/datacamp_app" db_engine = sqlalchemy.create_engine(connection_uri) usr1 = pd.read_sql("SELECT * FROM rating WHERE user_id=4387") usr2 = pd.read_sql("SELECT * FROM rating WHERE user_id=4381") # the helper function to compare the users print_user_comparison(usr1, usr2) ## Step 2: avg. rating per course def transform_avg_rating(rating_data): avg_rating = rating_data.groupby('course_idd').rating.mean() sort_rating = avg_rating.sort_values(ascending=False).reset_index() return sort_rating rating_data = extract_rating_data(db_engines) avg_rating_data = transform_avg_rating(rating_data) ## Step 3: from ratings to recommendation # end up with triplets of data: user id, course id, rating prediction # rating prediction: estimate the rating the user would give the course beforre they even took it # => top 3 recommended courses for each unique user id # techniques to transform the rating table into recommendations: # 1. matrix factorization # 2. building Recommendation Engines with PySpark # Recommendation transformation: # 1. use course that user has rated most # 2. don't recommend courses that user already rated # 3. recommend 3 highest rated courses from remaining combinations # 3.1 filter out corrupt data course_data = extract_course_data(db_engines) print(course_data.isnull().sum()) def transform_fill_programming_language(course_data): imputed = course_data.fillna("programming_language":"R") return imputed transformed = transform_fill_programming_language(course_data) print(transformed.isnull().sum()) # 3.2 using the recommender tranformation def transform_recommendations(acr, cr): merged = acr.merge(cr) grouped = merged.sort_values("rating", ascending=False).groupby("user_id") recommendations = grouped.head(3).sort_values("user_id").reset_index() final_recommendations = recommendations[["user_id", "course_id","rating"]] return final_recommendations recommendations = transform_recommendations(avg_course_ratings, courses_to_recommendd) ## so far ## - extract using extract_course_data() and extract_rating_data() ## - clean up NA using transform_fill_programming_language() ## - average course ratings per course: transform_avg_rating() ## - get eligible user and course id pairs: transform_courses_to_recommend() ## - calculate the recommendations: transform_recommendations() ## Step 4: Scheduling daily jobs ## Loading to Postgres ## - use the calculations in data products ## - update daily ## - example use case: sending out emails with recommendations # the loading phase recommendations.to_sql( "recommendations", db_engine, if_exists="append" ) def etl(db_engines): # extract the data courses = extract_course_data(db_engines) rating = extract_rating_data(db_engines) # clean up courses data courses = transform_fill_programming_language(courses) # get the average course ratings avg_course_rating = transform_avg_rating(rating) # get eligible user and course id pairs courses_to_recommend = transform_courses_to_recommend( rating, courses ) # calculate the recommendation recommendations = transform_recommendations( avg_course_rating, courses_to_recommend ) # load the recommendations into the db load_to_dwh(recommendations, db_engine) ## Step 5: Creating the DAG from airflow.models import DAG from airflow.operators.python_operator import PythonOperator dag = DAG(dag_id="recommendations", schedule_interval="0 0 * * *") task_recommendations = PythonOperator( task_id = "recommendations_tasks", python_callable=etl )

Streamlined Data Ingestion with pandas

1. importing from flat files

import pandas as pd tax_data = pd.read_csv("data_2016.tsv", sep="\t") #\t: a tab tax_data.head(3) # plot the total number of tax returns by income group counts = data.groupby("agi_stub").N1.sum() #N1: the income column counts.plot.bar() plt.show() ##limiting columns #- choose columns to load: usecols #- accept a list of column numbers or names, or a function to filter column names col_names = ['STATEFIPS', 'STATE', 'zipcode', 'agi_stub', 'N1'] col_nums = [0,1,2,3,4] tax_data_v1 = pd.read_csv('us_tax_data_2016.csv', usecols=col_names) tax_data_v2 = pd.read_csv('us_tax_data_2016.csv', usecols=col_nums) tax_data_first1000 = pd.read_csv('us_tax_data_2016.csv', nrows=1000) col_names = list(tax_data_first1000) tax_data_next500 = pd.read_csv( 'us_tax_data_2016.csv', nrows=500, skiprows=1000, header=None, names=col_names ) cols = list(["zipcode", "agi_stub", "mars1", "MARS2", "NUMDEP"]) data = pd.read_csv('us_tax_data_2016.csv', usecols = cols) print(data.groupby("agi_stub").sum()) ###Handling errors and missing data ## common import issues: 1. wrong column data types, 2. missing values, 3. cannot-read records # data types print(df.dtypes) df = pd.read_csv("vt_data_first500.csv", dtype={"zipcode":str}) print(df.dtypes) # missing data values tax_data = pd.read_csv("vt_data_first500.csv", na_values={"zipcode":0}) print(tax_data[tax_data.zipcode.isna()]) # lines with errors # change this behavior with 2 args: error bad lines, warn bad lines tax_data = pd.read_csv( "vt_data_first500.csv", error_bad_lines=False, #skip unparseable records warn_bad_lines=True #see messages when records are skipped ) try: data = pd.read_csv("vt_tax_data_2016_corrupt.csv") except pd.errors.ParserError: print("...")

2. importing from Excel files

  • data stored in tabular form, with cells arranged in rows and columns
  • have formatting and formulas
  • multiple spreadsheets in a workbook
import pandas as pd survey_data = pd.read_excel("fcc_survey.xlsx") survey_data = pd.read_excel( "fcc_survey_headers.xlsx", skiprows=2, usecols="W:AB, AR" )

from multiple worksheets

survey_data_sheet2 = pd.read_excel("fcc_survey.xlsx", sheet_name = 1) survey_data_sheet2 = pd.read_excel("fcc_survey.xlsx", sheet_name = '2017') # read all sheets in a workbook survey_responses = pd.read_excel("fcc_survey.xlsx", sheet_name=None) print(type(survey_responses)) # OrderedDict for key, value in survey_responses.items(): print(key, type(value)) # 2016 <class 'pandas.core.frame.DataFrame' # create empty dataframe to hold all loaded sheets all_responses = pd.DataFrame() for sheet_name, frame in survey_responses.items(): # add a column so we know which year data is from frame["Year"] = sheet_name # add each df to all_responses all_responses = all_responses.append(frame) print(all_responses.Year.unique())

modifying imports:true/false data

bootcamp_data = pd.read_excel("fcc_survey_booleans.xlsx") print(bootcamp_data.dtypes) # count True values # sum the df's columns to see how many Trues each float column has print(bootcamp_data.sum()) # count NAs print(bootcamp_data.isna().sum()) # load data, casting True/False columns as Boolean bool_data = pd.read_excel( "fcc_survey_booleans.xlsx", dtype={ "AttendedBootcamp":bool, "AttendedBootcampYesNo":bool, "AttendedBootcampTF":bool, "BootcampLoan":bool, "LoanYesNo":bool, "LoanTF":bool }) # Problem: # - NA/missing values in Boolean colmns are changed to True # - Unrecognized values in a Boolean column are also changed to True bool_data = pd.read_excel( "fcc_survey_booleans.xlsx", dtype={ "AttendedBootcamp":bool, "AttendedBootcampYesNo":bool, "AttendedBootcampTF":bool, "BootcampLoan":bool, "LoanYesNo":bool, "LoanTF":bool}, true_values=["Yes"], false_values=["No"] )

Modifying imports: parsing dates

# datetime columns are loaded as objects (strings) by default # list columns of dates to parse date_cols = ["Part1StartTime", "Part1EndTime"] # loas file, parsing standard datetime columns survey_df = pd.read_excel("fcc_survey.xlsx", parse_dates=date_cols) # check data types of a timestamp columns print(survey_df[["Part1StartTime", "Part1EndTime", "Part2StartTime", "Part2EndTime"]].dtypes) date_cols = [ "Part1StartTime", "Part1EndTime", #a new combined datetime column: Part2StartDate_Part2StartTime [["Part2StartDate", "Part2StartTime"]] ] survey_df = pd.read_excel("fcc_survey.xlsx", parse_dates=date_cols) date_cols = { "Part1Start":"Part1StartTime", "Part1End":"Part1EndTime", "Part2Start":["Part2StartDate", "Part2StartTime"] } survey_df = pd.read_excel("fcc_survey.xlsx", parse_dates=date_cols) # Part2Start # view summary stastistics print(survey_df.Part2Start.describe()) # non-standard dates # - parse_dates: not work with non-standard datetime formats # solution: pd.to_datetime() # %Y %m %d %H %M %S: 1999 03 01 21 09 05 # 03292016 21:27:25 format_string = "%m%d%Y %H:%M:%S" survey_df["Part2EndTime"] = pd.to_datetime( survey_df["Part2EndTime"], format=format_string )

3. importing from DBs

  • SQLite dbs are computer files (just as CSVs & Excel files are,making them great for sharing data)
  • Connecting to DBs is a two-step process: (SQL and pandas)
    1. create way to connect to db
    2. query db
  • using the SQLAlchemy library, which has tools to work with many major relational DBs
    • create_engine(): makes an engine to handle db connections
      • needs string URL of db to connect to
      • SQLite URL format: sqlite:///filename.db
  • querying dbs
    • pd.read_sql(query, engine): load in data from a db
import pandas as pd from sqlalchemy import create_engine # create db engine to manage connections engine = create_engine("sqlite:///data.db") # load entire weather table by table name weather = pd.read_sql("weather", engine) # load entire weather table with SQL weather = pd.read_sql("SELECT * FROM weather", engine) query = """ SELECT DISTINCT borough, complaint_type FROM weather; """ query = """ SELECT * FROM weather WHERE borough = 'BROOKLYN'; """ weather = pd.read_sql(query, engine) print(weather.borough.unique()) print(weather.shape) #(2016, 8) query = """ SELECT borough, COUNT(*) FROM weather WHERE complaint_type = 'PLUMBING' GROUP BY borough; """ # joining and aggregating query = """ SELECT h.borough, COUNT(*), b.total_population, b.housing_units FROM happy as h JOIN bonny as b ON h.borough = b.borough GROUP BY h.borough; """

SQL order of keywords: 1.SELECT, 2.FROM, 3.JOIN, 4.WHERE, 5. GROUP BY

4. importing JSON data and working with APIs

  • pandas guesses how to arrange it in a table
  • pandas can automatically handle common orientations
import pandas as pd # split: different lists for column names, indices, and values death_causes = pd.read_json("nyc_death_causes.json", orient="split") print(death_causes.describe())
import requests import pandas as pd api_url = "https://api.yelp.com/v3/businesses/search" params = { "term":"bookstore" } headers = {"Authorization": "Bearer {}".format(api_key)} response = requests.get(api_url, params = params, headers = headers) # isolate the JSON data from the response object data = response.json() # load biz data to a df bookstores = pd.DataFrame(data["businesses"]) print(bookstores.head(2)) ## pandas.io.json: submodule has tools for reading and writing JSON ## json_normalize(): ## - record_path: string/list of string attributes to nested data ## - meta: list of other attributes to load to dataframe ## - meta_prefix: string to prefix to meta column names ## - take a dictionary/list of dictionaries (like pd.DataFrame() does) ## - return a flattened df ## - default flattened column name pattern: attribute.nestedattribute import pandas as pd import requests from pandas.io.json import json_normalize # set up headers, parameters, and API endpoint api_url = "https://api.yelp.com/v3/businesses/search" headers = {"Authorization":"Bearer {}".format(api_key)} params = { "term":"bookstore", "location":"San Francisco" } # make the API call and extract the JSON data response = requests.get( api_url, headers=headers, params=params ) data = response.json() # flatten data and load to dataframe, with _ separators bookstores = json_normalize(data["businesses"], sep="_") print(list(bookstores)) print(bookstores.categories.head()) # Option 2 # flatten categories data, bring in business details df = json_normalize( data["businesses"], sep = "_", record_path = "categories", meta = [ "name", "alias", "rating", # attributes nested under ["coordinates", "latittude"], ["coordinates", "longitude"] ], meta_prefix = "biz_" )

Combining multiple datasets

Appending (df1.append(df2))

  • ignore_index = True: renumber rows

Use case: adding rows from one df to another

# get first 20 bookstore results params = {"term":"bookstore", "location":"San Francisco"} first_results = requests.get( api_url, headers=headers, params=params ).json() first_20_bookstores = json_normalize(first_results["businesses"], sep="_") print(first_20_bookstores.shape) # get the next 20 bookstores params["offset"] = 20 next_results = requests.get( api_url, headers=headers, params=params ).json() next_20_bookstores = json_normalize(next_results["businesses"], sep="_") print(next_20_bookstores.shape) # put bookstore datasets together, renumber rows # ignore_index=True to relabel rows bookstores = first_20_bookstores.append(next_20_bookstores, ignore_index=True) print(bookstores.name) # the name column to confirm the resulting df has 40 stores # get from 51 to 100 params = { "term":"cafe", "location":"NYC", "sort_by":"rating", "limit":50, "offset":50 } result = requests.get(api_url, headers=headers, params=params) next_50 = json_normalize(result.json(["businesses"])) cafes = top_50.append(next_50, ignore_index=True)

Merging

  • datasets have key column(s) with common values
  • merge(): pandas version of a SQL join
  • df.merge()
    • 2nd df to merge
    • columns to merge on
      • on: if names are the same in both dataframes
      • left_on and right_on: if key names differ
      • Key columns should be the same data type

Use case: combining datasets to add related columns

# default merger() <=> inner join merged = call_counts.merge(weather, left_on="created_date", right_on="date") merged_2 = merged.merge(cafes, on = "puma")

Importing and Cleaning

import pandas as pd ##align right student['name'] = student['name'].str.strip() ##drop the missing values from a specific column game.dropna(subset = ["team"]) ##fill in the missing data with the fill value 0 game.fillna(0) ##load the file using the Python context manager with open("data.txt", mode="r") as file: print(file.read()) ##set the column 'name' as an index game = game.set_index('name') ##lower case email.lower() ##drop the rows where all values are NaN score.dropna(how = 'all') ##load the pickled file x = pd.read_pickle('data.pkl') #type(x): <class 'dict'> ##reset the index game = game.reset_index() ##identify the missing values import numpy as np import pandas as pd data.isna() ##remove the duplicate grade s = student.drop_duplicates() ##import only the first 6 rows of the data pd.read_csv(file_name, nrows=6) ##using bs4 make an instance of BeautifulSoup with HTML data from bs4 import BeautifulSoup s = BeautifulSoup(html_doc) ps = s.prettify()

Git Intro

##to see the working location pwd ##editing a file nano filename.csv #save: Ctrl+O; exit: Ctrl+X ##create or edit a file echo "Review..." > todo.txt ##Git version git --version

Staging and committing

  • saving a draft: staging area (like placing a letter in an envelope)
  • save files/update the repo: commit changes (like putting the envelope in a mailbox)

Git storage workflow: Working Directory (report.md) -> Staging Area (2 draft updates: staged,...) -> .git Directory (Permanent Storage) (committed-2nd, committed-5th)

Git workflow:

  1. modify a file
  2. save the draft
  3. commit the updated file
  4. repeat

[modify]nano filename [modify]echo [draf]adding a single file: git add report.md [draf]adding all modified files: git add . #. = all files and dirs in current location [save]making a commit: git commit -m "Log message: short and concise"

Check the status of files: git status

Comparing files

##comparing an unstaged file with the last commit nano report.md git add report.md git commit - m "..." nano report.md git diff report.md #@@: line changes; -: Removed lines; +: Added lines # @@ -48,3 +48,4: # <=> 3 lines changed at line 48 in the last commit # <=> and 1 additional line has been added in our current unstaged file git add report.md # add all files to the staging area git add . # git diff -r: look at a particular revision of the file # git diff -r: won't work if it isn't followed by HEAD # HEAD: the most recent commit git diff -r HEAD report.md ##comparing multiple staged files with the last commit git diff -r HEAD # show the diff between all files in the staging area ###Recap: ##1. compare an unstaged file with the last committed version git diff filename ##2. compare a staged file with the last committed version git diff -r HEAD filename ##3. compare all staged files with the last committed version git diff -r HEAD

Storing data with Git

  1. Visualizing the commit structure:

Commit (1st commit - [hash such as 2d135de]) --> Tree (file1, file2) --> Blob (content_changed_1, content_changed_2)

Commit (2nd commit - 2f423f2) --> Tree (file1, file2, file3) --> Blob(content_changed_1, content_changed_2_2, content_changed_3)

  • Blob: show a snapshot of what the files contained at that time
  • => the commit hash for the last updated version of file1 is 2d135de
# display all commits made to the repo in chronological order, starting with the oldest # commit hash, author, date, commit message # : <=> there are more commits => 'space' to show more recent commits; 'q': quit git log
  1. Git hash
  • a hash: produced by using a pseudo-random number generator called a hash function
  • hashes: allow data sharing between repos
    • 2 files are the same <=> their hashes are the same
    • Git only needs to compare hashes
  1. finding a particular commit
# repo's history git log # display the difference between report.md in that commit versus the latest version # only need the first 6-8 characters of the hash git show c27fa856 # Output: log --> diff --> data entry error

Viewing changes

# use a tilde ~ to pick a specific commit to compare versions # HEAD : the last commit # HEAD~1: the second most recent commit # HEAD~2: the commit before that git show HEAD~3 # @@ -0,0 +1,47 @@ <=> has 47 lines added ## git show: view changes made in a particular commit ## git diff: compare changes between 2 commits # compare the 4th and 3rd most recent commits git diff 35ds45e 18ea93wt1 # or git diff HEAD~3 HEAD~2 ##Changes per document by line <=> show line-by-line changes and associated metadata # show hash, author, time, line, line content git annotate report.md

Undoing changes before committing

Unstaging a file:

Scenario: accidentally add 'file4' to the staging area.

  • Repository(file1, file2, file3, file4) --> Staging Area(file1, file3, file4)

'file4' must be unstaged before we commit the two modified files from the staging area

Solution:

## unstage a single file git reset HEAD file4 ## unstage all files git reset HEAD ## Undo changes to an unstaged file (in the repository) # checkout: switch to a different version (defaults to the last commit) # losing all changes made to the unstaged file forever git checkout -- file4 ## undo changes to all unstaged files # . <=> the current dir git checkout .

Restoring and reverting

## customizing the log output # restrict the number of commits display git log -3 # the number of commits = 3 # to only look at the commit history of one file git log -3 report.md # restrict by date: git log --since='Month Day Year' git log --since='Apr 2 2022' git log --since='Apr 2 2022' --until='Apr 11 2022' ## restoring an old version of a file git checkout -- filename # to revert to a version from a specific commit (with the hash) git checkout dc9d8fac file.csv #dc9d8fac <=> the 2nd to last commit # another approach git checkout HEAD~1 file.csv ## restoring a repo to a previous state git checkout dc9d8fac # using the commit hash # another approach git checkout HEAD~1 ## cleaning a repository # see what files are not being tracked git clean -n # delete those files (cannot be undone! - files are gone for good) git clean -f

Configuring Git

  • to speed up or improve how we work

3 levels of settings: git config --list

  1. --local: for one specific project
  2. --global: for all of our projects
  3. --system: for every users on this computer
  • user.email
  • user.name
  • core.editor
  • core.repositoryformatversion
  • core.filemode
  • core.bare
  • core.logallrefupdates
# display all settings git config --list # check the global settings git config --global --list ##changing our settings: git config --global setting value git config --global user.email [email protected] git config --global user.name 'Dinh Nguyen' ##using an alias # for committing files by executing ci git config --global alias.ci 'commit -m' # now commit files git ci # for unstaging files git config --global alias.unstage 'reset HEAD' ##tracking aliases #.gitconfig file # alias.ci=commit -m # alias.unstage=reset HEAD git config --global --list # an alias to check the state of files git config --global alias.st status #=> git st ##ignoring specific files nano .gitignore # *.log <=> (*: wildcard) Git will ignore any files ending with .log # Other files commonly ignored include API keys or credentials, system files, software dependencies

Branches

Git uses branches to systematically track multiple verions of files

Visualizing branches: Main, Analysis, Report (over time)

Source and destination: (when merging 2 branches)

  • parent commits: commits
  • source: the branch we want to merge from
  • destination: the branch we want to merge into
  • when merging Analysis into Main: Analyis = source, Main = destination

Benefits of branches:

  • avoiding endless subdirectories
  • multiple users can work simultaneously
  • everything is tracked
  • minimizes the risk of conflicting versions
##identifying branches git branch # branch1 # main # * branch2 => * <=> we are currently in this branch ##creating a new branch named report git checkout -b report ##the difference between branches git diff main branch1 #show the content were added in the 2nd branch

Working with branches

Why do we need to switch branches?

  • common to work on different components of a project simultaneously
  • branches allow us to keep making progress concurrently

Main[current code][Code Updated][Code Updated] --> Testing [Test1][Test2][Test3][Evaluate] and --> Debugging [Analyze][Update][Test]

##switch to anothe branch git checkout debugging

Why do we merge brnches?

  • main = ground truth
  • each branch should be for a specific task
  • once the task is completed, we should merge our changes into main
    • to keep it up to date and accurate
##merging branches (git merge source destination) git merge branch1 main # Merge output shows: 1.the last 2 commit hashes from each branch; 2.type of merge; # Type of merge = a fast-forward merge # <=> additional commits were made on the branch1 branch, # so Git brings the main branch up to date # 3.number of lines changed; 4.files modified

Handling conflict

## Step 1. attempting to merge a conflict git merge branch1 main # Step 2. check the difference between verions in the two branches nano file1 #<<<< HEAD ..... >>>>>> update ...... # Current branch: between <<<< HEAD and ...... # Update branch: >>>>>> update ...... # Step 3. delete or keep manually # Step 4. add in the stage again git add file1 # Step 5. commit git commit -m "..." # Step 6. check git merge branch1 main #msg: 'Already up to date.'

How do we avoid conflicts?

  • use each branch for a specific task
  • avoid editing a file in multiple branches
  • doesn't guarantee we'll avoid conflicts

Collaborating with Git

Creating repos

Why make a repo? - Benefits of repos:

  • Systematically track versions
  • Collaborate with colleagues
  • Git stores everything

Nested repositories

  • don't create a Git repo insite another Git repo!
##creating a new repo git init workspace1 cd workspace1 git status ##converting a project git init git status # to see if there is untracked files

Working with remotes

Local Repo <--> Remote Repo

Benefits of remote repos:

  • everything is backed up
  • collaboration, regardless of location

Cloning a remote

  • Remote repos are stored in an onilne hosting service (GitHub, Gitlab, Bitbucket)

Identifying a remote

  • when cloning a repo, Git remembers where the original was
  • Git stores a remote tag in the new repo's configuration
##cloning locally: git clone path-to-project-dir git clone /home/dinh/repo # give the cloned repo a name git clone /home/dinh/repose new_repo ##cloning remotely: git clone [URL] git clone https://github.com/datacamp/project ## origin: the name of the remote ## add more remotes by specifying a name for Git to assign ## defining remote names is useful for merging branches ## git remote add name URL git remote add dinh https://github.com/dc/repo # name the remote as 'dinh2' to serve as a shortcut when working between branches git remote add dinh2 /home/john/repo # get remote tag(s)/remote name(s) of remote repos linked to your project git remote # datacamp # more remote tags (for featch and pull) # list all remotes including their URL(s) git remote -v

Gathering from a remote

[Fetching]To compare the files in a remote against the contents of a local repo, we first need to fetch versions from the remote.

[Synchronizing]After fetching, we now have the contents of the remote in our local repo. However, we need to synchronize contents between the two repos.

[Pulling]As the remote is the source of truth, it's often ahead of local repos meaning the workflow of fetching content and synchronizig locally is very common. To simplify this process, Git allows us to fetch and merge using a single command

Note: [Pulling with unsaved local changes]If we have been working locally and not yet committed our changes, then Git won't allow us to pull from a remote

##fetching from a remote: git fetch remote_name local_branch_to_fetch_into git fetch origin main ##synchronizing content: git merge remote_name local_branch_to_merge git merge origin main ##pulling from a remote: git pull remote_name local_branch_to_pull git pull origin main # compare the remote repo with your local main branch git diff origin main

Pushing to a remote

Pulling from a remote: Remote Repo --pull--> Local Repo

Pushing from a remote: Remote Repo <--push-- Local Repo

# save changes locally first! git push remote local_branch git push origin main ##Q: what happens if we don't start the workflow by pulling from the remote? # pushing first git push origin main # Answer: remote/local conflicts # 1st line: the remote repo URL # 2nd and 3rd lines: the outcome of the command: (rejected and failed) # to 7th lines: Git provides hints (reasons and suggestions) ##Solution: # step 1: pull git pull origin main # step 2: add a message for the merge => leave a message that we're pulling the latest report from the remote # step 3: (not recommended) avoid leaving a mesage git pull --no-edit origin main # step 4: push to the remote git push origin main

Writing Efficient Python Code

  1. How to write clean, fast, and efficient Python code
  2. How to profile your code for bottlenecks
  3. How to eliminate bottlenecks and bad design patterns

Defining efficient

  • writing efficient Python code
    • minimal completion time (fast runtime)
    • minimal resource consumption (small memory footprint)
    • focus on readability
    • using Python's constructs as intended (i.e., Pythonic)
# Non-Pythonic doubled_numbers = [] for i in range(len(numbers)) doubled_numbers.append(numbers[i]*2) # Pythonic doubled_numbers = [x*2 for x in number]

The Zen of Python:

  • Beautiful is better than ugly
  • Explicit is better than implicit
  • Simple is better than complex
  • Complex is better than complicated
  • Fliat is better than nested
  • Sparse is better than dense
  • Readability counts
  • Special cases aren't special enough to break the rules
  • Although practicality beats purity
  • Errors should never pass silently
  • Unless explicitily silenced
  • In the face of ambiguity, refuse the temptation to guess
# range(start, stop) range(0, 11) # range(stop) range(11) # range(start, stop, step) range(2, 11, 2) # unpacking a range object [*range(start, stop, from)] # <=> range() + list() [*range(start, stop, steps)] # enumerate(): creates an indexed list of objects letters = ['a', 'b'] indexed_letters = enumerate(letters) list(indexed_letters) # [(0, 'a'), (1, 'b')] enumerate(letters, start=5) # unpack an enumerate [*enumerate(list, start = index)] # <=> [(i, val) for i, val in enumerate(names)] [*enumerate(names, start = 1)] # map(): applies a function over an object nums = [1.5, 2.3] n = map(round, nums) list(n) # 2, 2 sq = map(lambda x: x**2, n) list(sq) # unpack a map into a list: [*map(method, list)] names_map = map(str.upper, names) names_uppercase = [*names_map] list(names_uppercase)

NumPy arrays

NumPy arrays are homogeneous (which means that they must contain elements of the same type) => more memory efficient and faster than Python lists

  • all be the same type eliminates the overhead needed for data type checking

NumPy array broadcasting is to perform operations over entire collections of values quickly

  • NumPy arrays vectorize operations, so they're performed on all elements of an object at once => efficiently perform calculations over entire arrays
import numpy as np ##1. alternative to Python lists: np.array(range(n))<=> list(range(n)) a = np.array(range(2)) #array([0, 1]) <=> [0, 1] a.dtype ##2. Python lists don't support broadcasting nums = [0, 1] nums**2 # TypeError: unsupported ... nums = np.array([0, 1]) nums**2 ##3. n-D arrays ##Basic 1-D indexing: lists <=> arrays ##2-D nums2 = [[1,2], [3,4]] nums2_np = np.array(nums2) ##Basic 2-D indexing nums2[0][1] # 2 nums2_np[0,1] # 2 # and [row[0] for row in nums2] # [1, 4] nums2_np[:,0] # all rows of column 0 => array([1, 4]) ##4. boolean indexing nums2_np[nums2_np > 0] # array([1, 2]) # no boolean indexing for lists [num for num in nums if num > 0] # 2nd row nums[1,:] # all elements of nums > 6 nums[nums > 6] # double every element n_d = nums * 2 # replace the 3rd col nums[:,2] = nums[:,2] + 1 ##An example # create a list of arrival times arrival_times = [*range(10, 60, 10)] # convert them to an array and update the times arrival_times_np = np.array(arrival_times) new_times = arrival_times_np - 3 # pair guests to new times guest_arrivals = [(names[i], time) for i, time in enumerate(new_times)] # welcome guest welcome_map = map(welcome_guest, guest_arrivals) # unpack guest_welcomes = [*welcome_map] print(*guest_welcomes, sep='\n')

Examining runtime

Why should we time our code?

  • allows us to pick the optimal coding approach
  • faster code == more efficient code

How can we time our code?

## Line magic: %timeit %timeit nums = np.random.rand(1000) # a mean & standard deviation of time ## -r: the number of runs; -n: loops %timeit -r2 -n10 nums = np.random.rand(1000) ## Cell magic: %%timeit (multiple lines of codes) %%timeit # then Shift+Enter nums = [] for x in range(10): nums/append(x) ## Save output: -o time = %timeit -o rand_nums = np.random.rand(1000) # time for each run times.timings # the best time for all runs times.best # the worst time for all runs times.worst ## Comparing times ## data structures: list() or []; dict() or {}; tuple() or () # runtime between creating a dict. f_time = %timeit -o formal_dict = dict() l_time = %timeit -o formal_dict = {} diff = (f_time.average - l_time.average) * (10**9) # runtime between creating a list f_time = %timeit -o formal_list = [num for num in range(51)] l_time = %timeit -o formal_list = [*range(51)] diff = (f_time.average - l_time.average) * (10**9) # specify 5 iterations each with 25 loops for converting from a list to a set %timeit -r5 -n25 set([1,3, 4])

Code profiling for runtime

Timing a large code base or see the line-by-line runtimes with a function?

Code profiling is a technique used to describe how long, and how often, various parts of a programe are executed

Code profiling pip install line_profiler

  • detailed stats on frequency and duration of function calls
  • line-by-line analyses
pip install line_profiler %load_ext line_profiler #load it into our IPython session # magic command for line-by-line times # -f: to profile a function %lprun -f convert_units convert_units(heroes, htw, wts) #output: a summarized table heroes = ['Batman', 'Superman'] hts = np.array([188.0, 191.0]) wts = np.array([ 95.0, 101.0]) def convert_units(heroes, heights, weights): new_hts = [ht * 0.39 for ht in heights] new_wts = [wt * 2.20 for wt in weights] hero_data = {} #dictionary for i, hero in enumerate(heroes): hero_data[hero] = (new_hts[i], new_wts[i]) # tuple return hero_data convert_units(heroes, htw, wts)

Code profiling for memory usage

###Quick and dirty approach import sys nums_list = [*range(1000)] sys.getsizeof(nums_list) import numpy as np nums_py = np.array(range(1000)) sys.getsizeof(nums_py) ## Only giving the size of ab individual object ## Q: what if we wanted to inspect the line-by-line memory footprint of our code?

Code profiling: memory

  • detailed stats on memory consumption
  • line-by-line analyses
  • memory is reported in mebibytes (~ mb)

%mprun output caveats

  • inspects memory by querying the OS
  • results may differ between platforms and runs
    • can still observe how each line of code compares to others based on memory consumption
pip install memory_profiler %load_ext memory_profiler %mprun -f convert_units convert_units(heroes, htw, wts)

Efficiently combining, counting, and iterating

The collctions module

  • specialized container datatypes
    • alternatives to general purpose dict, list, set, and tuple
  • notable:
    • namedtuple: tuple subclasses with named fields
    • deque: list-like container with fast appends and pops
    • Counter: dict for counting hashable objects
    • OrderedDict: dict that retains order of entries
    • defaultdict: dict that calls a factory function to supply missing values

The itertools module

  • functional tools for creating and using iterations
  • Notable:
    • infinite iterators: count, cycle, repeat
    • finite iterators: accumulate, chain, zip_longest
    • combination generators: product, permutations, combinations
##combining objects names = ['Bul', 'Char', 'Squir'] hps = [45, 39, 44] # bad combined = [] for i, pokemon in enumerate(names): combined.append(pokemon, hps[i]) ##combining objects with zip combined_zip = zip(names, hps) #<class 'zip'> combined_zip_list = [*combined_zip] ##counting with loop poke_types = ['Grass', 'Dark'] type_counts = {} for poke_type in poke_types: if poke_type not in type_counts: type_counts[poke_type] = 1 else: type_counts[poke_type] += 1 ##counting with collections.Counter() from collections import Counter type_counts = Counter(poke_types) # If comparing runtime times, using Counter taks half the time as the standard dict. approach ##combinations with loop combos = [] for x in poke_types: for y in poke_types: if x == y: continue if ((x,y) not in combos) & ((y,x) not in combos): combos.append((x,y)) ##combinatios with itertools.combinations() from itertools import combinations combos_obj = combinations(poke_types, 2) # length of combinations: 2 (a pair has 2 pokemons) combos = [*combos_obj]

Set theory

  • Comparing 2 objects to observe similarities and differences between their contents
  • 'set' datatype with accompanying methods
    • intersection(): in both sets
    • difference(): in one set but not the other
    • symmetric_difference(): in exactly one set (not both)
    • union(): in either set
  • fast membership testing
    • check if a value exists in a sequence or not (using 'in')
list_a = ['bul', 'cha', 'squi'] list_b = ['cater', 'pidg', 'squi'] set_a = set(list_a) set_b = set(list_b) set_a.intersection(set_b) # 'squi' %timeit in_common = set_a.intersection(set_b) set_a.difference(set_b) # 'bul', 'cha' set_a.symmetric_difference(set_b) # 'bul', 'cha', 'cater', 'pidg' set_a.union(set_b) # 'bul', 'cha', 'cater', 'pidg', 'squi' ##membership testing with sets names_list = ['bul', 'cha', 'squi'] # %timeit 'squi' in names_list names_tuple = ('bul', 'cha', 'squi')# %timeit 'squi' in names_tuple names_set = {'bul', 'cha', 'squi'} # %timeit 'squi' in names_set ##Uniques with sets primary_types = ['bulaga', 'charoge', 'squirtle',...] set(primary_types) # a set of distinct types ##using 'in' print('Psy' in ash_list) print('Oke' in ash_set)

Eliminating loops

Benefits of eliminating loops:

  • fewer lines of code
  • better code readability: "flat is better than nested"
  • efficiency gains
%%timeit totals = [] for row in poke_stats: totals.append(sum(row)) %timeit total_comp = [sum(row) for row in poke_stats] ## Solution 1 %timeit total_map = [*map(sum, poke_stats)] ## Solution 2: module approach from itertools import combinations comos2 = [*combinations[poke_types, 2]] ## Solution 3: with NumPy import numpy as np %timeit avgs_np = poke_stats.mean(asix=1) # Step 1: collect P that belong to generation 1 or generation 2 pokemons = [name for name,gen in zip(p_names, p_gens) if gen < 3] # Step 2: create a map object name_map = map(len, pokemons) # Step 3: combine into a list g_lengths = [*zip(pokemons, name_map)] print(g_lengths[:5])

Writing better loops

  • understand what's being done with each loop iteration
  • move one-time calculations outside (above) the loop
  • use holistic converions outside (below) the loop
  • anything that is done once should be outside the loop
import numpy as np names = ['Abs', 'Aron', 'Jyn', 'Onix'] attacks = np.array([130, 50, 45, 70]) %%timeit ##moving calculations above a loop total_atk_avg = attacks.mean() for pokemon, atk in zip(names, attacks): if atk > total_atk_avg: print( "{}".format(pokemon) ) ##timeit poke_data_tuples = [] for poke_tuple in zip(names, legend_status, generations): # not effective => should convert outside the loop #poke_list = list(poke_tuple) #poke_data.append(poke_list) poke_data_tuples.append(poke_tuple) ## using holistic converions poke_data = [*map(list, poke_data_tuples)]

Intro to pandas DataFrame iteration

##calculating win percentage import numpy as np def calc_win_perc(wins, games_played): win_perc = wins/ games_played return np.round(win_perc, 2) ##adding win % to DataFrame # Approach 1 - inefficient %%timeit win_perc_list = [] for i in range(len(baseball_df)): row = baseball_df.iloc[i] #iloc: lookup each individual row using index wins = row['W'] # W column games_played = row['G'] # G column win_perc = calc_win_perc(wins, games_played) win_perc_list.append(win_perc) baseball_df['WP'] = win_perc_list # Approach 2 - .iterrows() <=> .iloc() %%timeit for i, row in baseball_df.iterrows(): wins = row['W'] games_played = row['G'] win_perc = calc_win_perc(wins, games_played) win_perc_list.append(win_perc) baseball_df['WP'] = win_perc_list

Problem: .iterrows() returns each DataFrame row as a tuple of (index, pandas Series) pairs, so we have to access the row's values with [i] row[i]

for row in df.iterrows(): row[i]['Team'] ## Faster! for row_namedtuple in df.itertuples(): row_namedtuple.Index row_namedtuple.Team row_namedtuple.Year

pandas alternative to looping

def calc_run_diff(runs_scored, runs_allowed): run_diff = runs_scored - runs_allowed run_diffs_iterrows = [] for i, row in df.iterrows(): run_diff = calc_run_diff(row['RS'], row['RA']) run_diffs_iterrows.append(run_diff) df['RD'] = run_diffs_iterrows ##pandas.apply(): takes a function and applies it to a DataFrame ## - must specify an axis to apply (0 for columns; 1 for rows) ## - can be used with anonymous function (lambda functions) ## Faster! run_diffs_apply = df.apply( lambda row: calc_run_diff(row['RS'], row['RA']), axis = 1 ) df['RD'] = run_diffs_apply test1 = df.apply(sum, axis = 0) test2 = df.apply( lambda row: text_playoffs(row['Playoffs']), axis = 1 )

Optimal pandas iterating

pandas internals

  • eliminating loops applies to using pandas as well
  • pandas is built on NumPy => take advantage of NumPy array efficiencies
##NumPy's .values => numpy.ndarray wins_np = df['W'].values ##broadcasting (vectorizing) is extremely efficient df['RS'].values - df['ra'].values ##An example win_perc_preds_loop = [] # 2nd fast with itertuples(! for row in df.itertuples(): r1 = row.RS r2 = row.RA pred = predict(r1, r2) loop.append(pred) # slowest with .apply() approach! preds_apply = df.apply(lambda row: predict(row['RS'], row['RA']), axis = 1) # fastest with NumPy arrays! preds_np = predict(df['RS'].values, df['RA'].values) df['WP_preds'] = preds_np

Cleaning Data in Python

Data type constraints

diagnose dirty data -> side effects of dirty data -> clean data

access data -> explore & process data -> extract insights -> report insights

(!) human error/technical error -> (!) access data -> (!) explore & process data -> (!) extract insights -> (!) report insights

##string to integers df['R'] = df['R'].str.strip('$') df['R'].astype('int') assert df['R'].dtype == 'int' ##convert to categorical df['ms'] = df['ms'].astype('category') df.describe()

Data range constraints

How to deal with out of range data?

  • dropping data (only when a small proportion of our dataset is affected )
  • setting custom minimums and maximums
  • treat as missing and impute
  • setting custom value depending on biz assumptions
import pandas as pd movies[movies['avg_rating'] > 5] movies = movies[movies['avg_rating'] <= 5] # inplace = True: values are dropped in place; don't have to create a new col movies.drop(movies[movies['avg_rating'] > 5].index, inplace = True) # assert results assert movies['avg_rating'].max() <= 5 #convert avg_rating > 5 to 5 movies.loc[movies['avg_rating']>5, 'avg_rating'] = 5 #convert to date import datetime as dt df['sub_date'] = pd.to_datetime(df['sub_date']).dt.date ##Date range example today_date = dt.date.today() # 1. drop the data # using filtering df = df[df['sub_date'] < today_date] # using .drop() df.drop( df[df['sub_date'] > today_date], inplace = True ) # 2. hardcode dates with upper limit # drop values using filtering df.loc[df['sub_date'] > today_date, 'sub_date'] = today_date # assert is true assert df.sub_date.max().date() <= today_date

Uniqueness constraints

##get duplicates across all columns df.duplicated() # True (duplicate values) False ... df[df.duplicated()] # or df[df.duplicated() == True] ## .duplicated(subset, keep) ## subset: list of column names to check for duplication ## keep: whether to keep first, last, or all(False) duplicate values (complete + uncomplete duplicates) col_names = ['first_name', 'last_name', 'address'] #list duplicates = df.duplicated(subset = col_names, keep = False) df[duplicates].sort_values(by = 'first_name') ## .drop_duplicates(subset, keep, inplace) ## inplace = True: drop duplicated rows directly inside DataFrame without creating new object df.drop_duplicates(inplace = True) ## .groupby() ## .agg(): aggregation method # group by column names and produce statistical summaries summaries = {'height': 'max', 'weight':'mean'} #dictionary ## drop incomplete duplicates by grouping by col_names and applying the aggregation in statistics # reset_index() to have numbered indices in the final output df.groupby(by = col_names).agg(summaries).reset_index() print(df[['col1', 'col2', 'col3']])

Membership constraints

To run ML models on categorical data, they are often coded as numbers (unmarried, married) => (0, 1)

Solutions: 1. dropping data; 2. remapping categories; 3. inferring categories

Joins:

  • anti: what's in A and not in B
  • inner: what's in both A & B
  • left anti join: what's in A only
##Finding inconsistent categories inconsistent_cats = set(df['col1']).difference(df2['col1']) ##Get rows with inconsistent categories df['col1'].isin(inconsistent_cats) ##Dropping inconsistent categories # find the col1 category in df not in df2 inconsistent_cats = set(df['col1']).difference(df2['col1']) # find rows with that category inconsistent_rows = df['col1'].isin(inconsistent_cats) inconsistent_data = df[inconsistent_rows] consistent_data = df[~inconsistent_rows] category_clean = set(df['col1']).difference(df2['col1']) category_clean_rows = df['col1'].isin(category_clean) df[~category_clean_rows]

Categorical variables

What type of errors could we have?

  1. value inconsistency
    • inconsistent fields: 'married', 'Maried', 'UNMARRIED', 'not married'
    • _Trailing white spaces: _'married', ' married'
  2. collapsing too many categories to few
    • creating new groups: 0-20K, 20-40K cateogories...from continuous household income data
    • mapping groups to new ones: mapping household income categories to 2 'rich', 'poor'
  3. making sure data is of type catogory
# get the column # in a dataset marriage_status = demographics['marriage_status'] marriage_status.value_counts() # in a dataframe (is a 2-D-row-n-column dataset) marriage_status.groupby('marriage_status').count() ##Value consistency # capitalize marriage_status['marriage_status'] = marriage_status['marriage_status'].str.upper() marriage_status['marriage_status'].value_counts() # lowercase marriage_status['marriage_status'] = marriage_status['marriage_status'].str.lower() marriage_status['marriage_status'].value_counts() # strip all spaces demographics = demographics['marriage_status'].str.strip() demographics['marriage_status'].value_counts() ##collapsing data into categories # create categories out of data # using qcut() import pandas as pd group_names = ['0-200K', '200-500K', '500K+'] demographics['income_group'] = pd.qcut(demographics['household_income'], q=3, labels=group_names) demographics[['income_group', 'household_income']] # using cut() - create ranges and names - much more correct! ranges = [0, 200000, 500000, np.inf] group_names = ['0-200K', '200-500K', '500K+'] demographics['income_group'] = pd.cut(demographics['household_income']), bins = ranges, labels = group_names) demographics[['income_group', 'household_income']] ##map categories to fewer ones (reducing categories in categorical column) # create mapping dictionary and replace mapping = ['MS':'DesktopOS', 'MacOS':'DesktopOS', 'Linux':'DesktopOS', 'IOS':'MobileOS', 'Android':'MobileOS'] devices['operating_system'] = devices['operating_system'].replace(mapping) devices['operating_system'].unique() label_ranges = [0, 60, 180, np.inf] label_names = ['short', 'medium', 'long'] airlines['wait_type'] = pd.cut(airlines['wait_min'], bins = label_ranges, labels = label_names) mappings = { 'Monday':'weekday', 'Tuesday':'weekday', 'Wednesday': 'weekday', 'Thursday': 'weekday', 'Friday': 'weekday', 'Saturday': 'weekend', 'Sunday':'weekend' } airlines['day_week'] = airlines['day'].replace(mappings)

Cleaning text data

Common text data problems:

  1. data inconsistency: +96... or 0096 or ...?
  2. fixed length violations: passwords needs to be at least 8 characters
  3. typos: +96.71.67784
##normalizing the phone numbers # 'assert' returns nothing if the condition passes # find length of each row sanity_check = phone['Phone_number'].str.len() # assert minimum phone number length is 10 assert sanity_check.min() >= 10 # assert all numbers do not have + or - assert phone['Phone_number'].str.contains("+|-").any() == False ##more complicated examples: +(01875)-25394, +0500-57812, +0800-1111 ##Solution: regular expressions # replace letters (+) with nothing phone['Phone_number'] = phone['Phone_number'].str.replace(r'\D+', '') ## another example # length of each row resp_len = airlines['survey_response'].str.len() airlines_survey = airlines[resp_len > 40] assert airlines_survey['survey_response'].str.len().min() > 40

Uniformity

  • temperature: 32oC or 89.6oF
  • weight: 70kg or 11st
  • date 26-11-2019 or 26, November, 2019
  • money: 100$ or 10763.9Y

Datetime formatting (datetime)

  • 25-12-2019 => %d-%m-%Y; December 25th 2019 => %c; %12-25-2019 => m-%d-%Y
  • pandas.to_datetime()
##Treating temperature data temp_fah = temperatures.loc[temperatures['Temperature'] > 40, 'Temperature'] temp_cels = (temp_fah-32) * (5/9) temperatures.loc[temperatures['Temperature']>40, 'Temperature'] = temp_cels assert temperatures['Temperature'].max() < 40 ##Convert to datetime # Fail! birthdays['Birthday'] = pd.to_datetime(birthdays['Birthday']) #ValueErrors: month must be in 1..12 # Success birthdays['Birthday'] = pd.to_datetime( birthdays['Birthday'], # attempt to infer format of each date infer_datetime_format=True, # return NA for rows where conversion failed errors = 'coerce' ) ##extract the year from the amended account_opened column and assign it to the acct_year column. banking['acct_year'] = banking['acct_year'].dt.strftime('%Y') # Solution 2: dt.strftime("%d-%m-%Y") birthdays['Birthday'] = birthdays['Birthday'].dt.strftime("%d-%m-%Y")

Treating ambiguous date data: is 2019-03-08 in August or March?

  • convert to NA and treat accordingly (drop them)
  • infer format by understanding data source
  • infer format by understanding previous and subsequent data in DataFrame

Cross field validation

  • example: confirming the Age provided by users by cross checking their birthdays
  • the use of multiple fields in a dataset to sanity (tinh dung dan) check data integrity
# axis = 1: row wise summing sum_classes = fligths[['economy', 'business', 'first']].sum(axis = 1) passenger_equ = sum_classes == flights['total_passengers'] # find and filter out rows with inconsistent passenger totals inconsistent_pass = flights[~passenger_equ] consistent_pass = flights[passenger_equ] ##Cross field validation example import pandas as pd import datetime as dt # convert to datetime and get today's date users['Birthday'] = pd.to_datetime(users['Birthday']) today = dt.date.today() # for each row in the Birthday column, calculate year diff. age_manual = today.year - users['Birthday'].dt.year # find instances where ages match age_equ = age_manual == users['Age'] # find and filter out rows with inconsistent age inconsistent_age = users[~age_equ] consistent_age = users[age_equ] print("# of inconsistent investments: ", inconsistent_age.shape[0])

What to do when we catch inconsistencies?

  • dropping data
  • set to missing and impute
  • apply rules from domain knowledge

Completeness and missing data

# return missing values airquality.isna() # get summary of missingness airquality.isna().sum() ##missingno: a useful package for visualizing and understanding missing data ##missingno.matrix(df): visualize the missingness of the dataframe import missingno as msno import matplotlib.pyplot as plt # visualize missingness msno.matrix(airquality) plt.show() # isolate the rows of the column with missing values # and complete values aside missing = airquality[airquality['CO2'].isna()] complete = airquality[~airquality['CO2'].isna()] # describe complete DataFrame and missing DataFrame complete.describe() missing.describe() # all missing values of CO2, they occur at really low temperatures, # then we input the sorted dataframe to the matrix function sorted_airquality = airquality.sort_values(by = 'Temperature') # missing values on the top. This is because values are sorted from smallest to largest by default msno.matrix(sorted_airquality) plt.show() # Conclusion: this essentially confirms that CO2 measurements are lost for really low temperatures. It must be a sensor failure!

Missingness types

  • missing completely at random (MCAR)
    • no systematic relationship between missing data and other values
    • "data entry errors when inputting data"
  • missing at random (MAR)
    • systematic relationship between missing data and other observed values
    • "missing Ozone data for high temperatures"
  • missing not a random (MNAR)
    • systematic relationship between missing data and unobserved values
    • "missing Temperature values for high Temperatures"

How to deal with missing data?

  • simple approaches
    1. drop missing data
    2. impute with statistical measures (mean, median, mode..)
  • more complex approaches
    1. imputing using an algorithmic approach
    2. impute with ML models
# drop missing values airquality.dropna(subset = ['col1']) # replace with statistical measures col1_mean = df['col1'].mean() df.fillna({'col1':col1_mean})

Comparing strings

  • least possible amount of steps needed to transition from one string to another: insert +, delete -, substitute, transpose <=>
  • minimum edit distance so far, e.g., 2 (-, +)

Minimum edit distance algorithms:

  • Damerau-Lavenshtein: insert, substitute, delete, transpose
  • Levenshtein: insert, substitute, delete
  • Hamming: substitution only
  • Jaro distancce: transposition only
from thefuzz import fuzz ## compare fuzz.WRatio('Reeding', 'Reading') # score: 86% being similar ## partial string comparison fuzz.WRatio('Houston Rockets', 'Rockets') ## partial string comparison with different order fuzz.WRatio('Houston Rockets vs Los Angeles Lakers', 'Lakers vs Rockets') from thefuzz import process ## compare with arrays from thefuzz import process string = "Houston Rockets vs Los Angeless Lakers" choices = pd.Series([ 'Rockets vs Lakers', 'Lakers vs Rockets', 'Houston vs Los Angeless', 'Heat vs Bulls' ]) # only 2 of possible matches to return ranked from highest to lowest process.extract(string, choices, limit = 2)

Collapsing categories with string similarity

  • Chapter 2: use .replace() to collapse "eur" into "Europe"
  • Problem: what if there are too many variations? => process.extract(str, arr, limit)
# for each correct category for state in categories['state']: # find potential matches in states with typoes # survey.shape[0]: survey's length matches = process.extract(state, survey['state'], limit = survey.shape[0]) # for each potential match for potential_match in matches: # if high similarity score # 80: cutoff point # [match[0, 1],...] <=> [('text', similarity_point),...] if potential_match[1] >= 80: # replace typo with correct category survey.loc[survey['state'] == potential_match[0], 'state'] = state

Generating pairs

Table A (event, time): [('event1', 19:00), ('event1', 20:00), ('event1', 21:00)] --link by Time to--> Table B (event, time): [('eventX', 7pm), ('eventY', 8pm), ('eventZ', 9pm)]

Record linkage: (recordlinkage)

  • Data A + Data B --generate--> Pairs --compare--> Pairs --score--> Pairs --link--> Data

Problem: ideally we want to generate all possible pairs between our DataFrames, but what if we had big DataFrames and ended up having to generate millions if not billions of pairs?

Solution: apply what we call Blocking, which creates pairs based on a matching column, reducing the number of possible pairs

import recordlinkage ##Step 1 - generate pairs # create Indexing object indexer = recordlinkage.Index() # generate pairs blocked on 'col1' indexer.block('col1') pairs = indexer.index(df1, df2) # output: a pandas multi-index-object containing pairs of row indices from both DataFrames # since we've already generated our pairs, it's time to find potential matches ##Step 2 - compare between columns # create a Compare object compare_cl = recordlinkage.Compare() # find exact matches for pairs of col2 and col1 compare_cl.exact('col2', 'col2', label='col2') # take columns in each DataFrame compare_cl.exact('col1', 'col1', label='col1') # find similar matches for pairs of col3 and col4 using String Similarity # the similarity cutoff point = 85% compare_cl.string('col3', 'col3', threshold=0.85, label='col3') compare_cl.string('col4', 'col4', threshold=0.85, label='col4') ##Step 3 - score the comparison # find matches potential_matches = compare_cl.compute(pairs, df1, df2) # output: 1 for a match, and 0 for not a match for each pair of rows in DataFrames # To find the potential matches, we need to find rows with more than matching value in a column # Probable matches # we can find them with potential_matches[potential_matches.sum(axis = 1) >= n] # n: the minimum number of columns we want matching to ensure a proper duplicate find # n <=> 100% x n #Problem: what should the value of n be? (if there are 3 pairing columns) #Answer: 3 because we need to have matches in all our columns ##Step 4 - link the DataFrames # get the indices matches.index # get indices from df2 only #column index = 1 OR = 'col1_df1' => df2 index column duplicate_rows = matches.index.get_level_values(1) # finding duplicates in df2 df2_duplicates = df2[df2.index.isin(duplicate_rows)] # finding new rows in df2 df2_new = df2[~df2.index.isin(duplicate_rows)] # link the DataFrames full_df = df1.append(df2_new)