Skip to content
CAN reviewed
  • AI Chat
  • Code
  • Report
  • A revisioned approach on the CAN bus analysis

    Loading modules

    import pandas as pd
    import numpy as np
    import matplotlib.pyplot as plt
    import seaborn as sns
    import os
    import glob
    import math
    
    import time
    from datetime import timedelta
    
    #start_time = time.monotonic()
    #end_time = time.monotonic()
    #print(timedelta(seconds=end_time - start_time))

    Start point

    We have a logfile of CAN bus communication, received through a dongle, stored in ASC-file. Cleaning and reformatting the data cannot be done by a simple import statement and the use of the delimiter function.

    In the first step we will:

    • clear the data of unnecessary information
    • remove superfluous information
    • determine whether the DLC for an ID is unique
    • create subsets, based on ID, for ID's with unique DLC

    Step 1: Data import and preparation

    # Preformat the ASC-file and remove invalid rows
    asc_logfile = 'golf_log.asc'
    
    # Function for cleaning the original ASC-file
    def clean_file(asc_filepath):
        """
        Converts the ASC-file in the correct way to create a DataFrame. 
        It also drops rows without a numeric DLC.
        """
        log = pd.read_csv(
            asc_filepath,
            header=None
        )
        log = log.iloc[:,0].str.split(
            n=14,               # Based on a visual inspection of the logfile
            expand=True         # Columns instead of series
        )\
        .iloc[:, :14]           # Keep only the first 14 columns
        
        columns = [             # List of new column names
            'timestamp',
            'channel',
            'id',
            'rx_tx',
            'een_d',
            'dlc',
            'D0',
            'D1',
            'D2',
            'D3',
            'D4',
            'D5',
            'D6',
            'D7'
        ]
        log.columns = columns
        log.drop(
            ['channel', 'rx_tx', 'een_d'],
            axis=1,
            inplace=True)
        
        # Create mask to identify rows where the dlc column contains non-integer
        mask = pd.to_numeric(
            log['dlc'],         # Takes DLC column
            errors='coerce'     # Forces it into numeric value
        )\
        .isna()                 # Creates a boolean series for when DLC is NaN
        
        # Select the rows where the dlc column is Not A Number (NaN)
        dropped_rows = log[mask]
        
        # Remove the selected rows from the original DataFrame
        log = log[~mask]
        log.reset_index(
            drop=True,
            inplace=True
        )
        return log, dropped_rows
    
    log, dropped_rows = clean_file(asc_logfile)

    After cleaning the file we can summon:

    • log, which is the cleaned data
    • dropped_rows, to inspect the rows that were dropped from the data
    Run cancelled
    print(dropped_rows)
    print(log)

    Next we store all the sublogs in a dictionary that can be called upon later in te process.
    We also want to see a list of identifiers and their DLC.

    # Split logfile in seperate logs based on unique identifier
    # Store all ID's in dictionary as keys, the sublog is the value
    # Create an id_dlc_list for overview of ID's and their DLC's
    
    def create_sublogs(cleaned_logfile):
        """
        Creates dictionary with id as key and sublog as value.
        !! Pay attention, as of now the identifiers are lowercase
        """
        group_dict = {}
        grouped = cleaned_logfile.groupby('id')
        for name, group in grouped:
            group = group.reset_index(drop=True)
            group_dict[name.lower()] = group
        print(f'The are {len(group_dict)} unique identifiers in the log')
        return group_dict
            
    def overview(dictionary):
        """
        returns a DataFrame containing all found ID's
        and the corresponding DLC's.
        """
        dlc_list = []
        id_list = list(dictionary.keys())
        for key in id_list:
            dlc = list(dictionary[key]['dlc'].unique())
            dlc_list.append(dlc)    
        data = [id_list, dlc_list]
        id_dlc_df = pd.DataFrame(
            data,
            index=['id', 'dlc']
        ).transpose()
        return id_dlc_df
    
    sublog_dictionary = create_sublogs(log)
    overview_df = overview(sublog_dictionary)

    Summary Step 1

    As of now we have the following variables and dictionaries:

    • log: the original ASC-file as DataFrame and cleaned from unimportant rows
    • dropped_rows: a DataFrame for inspecting the dropped rows. A second check whether no faults were made.
    • sublog_dictionary: a dictionary where the keys are the identifiers and the values are all the rows/messages bearing the assigned ID (the ID-subset in other words).
    • overview_df: A DataFrame listing all found id's and their corresponding dlc's.

    Step 2: Calculating the parameters (BFR and Magnitude)

    State whether the input file is HEX or DEC

    # Is the log in HEX or DEC?
    dataformat = 'hex'
    Binary dictionary

    Creation of second dictionary but now with binary information. ID's with more than 1 unique DLC are dropped.

    # Convert HEX to BIN
    def hex_to_bin(dictionary):
        """
        Converts HEX values into BIN equivalent.
        The creation of a new (copied) dictionary is required to not alter the original sublog_dictionary.
        First, only handle ID for which is only one unique DLC.
        Second, convert the HEX values to BIN values.
        Put those values back in the copied dictionary.
        """
        bin_dictionary = {key: value[:] for key, value in dictionary.items()} # See deepcopy() in literature
        keys_to_drop = []
        for key, value in bin_dictionary.items():
            if value['dlc'].nunique() != 1:
                print(f'id {key} skipped because unique DLC is not 1')
                keys_to_drop.append(key)
            else:
                dlc = int(value['dlc'][0])
                num_bits = dlc * 8
                for i in range(num_bits):
                    byte_index = i // 8
                    col_name = f'bit_{i:02}'
                    value[col_name] = value[f'D{byte_index}'].apply(
                        lambda x, i=i: bin(int(str(x), 16))[2:].zfill(8)[i%8]
                    )
                value.drop(
                    columns=[f'D{i}' for i in range(dlc)],
                    inplace=True
                )
            bin_dictionary[key] = value
        for key in keys_to_drop:
            del bin_dictionary[key]
        return bin_dictionary
    
    if dataformat == 'hex':
        bin_dictionary = hex_to_bin(sublog_dictionary)
    if dataformat == 'dec':
        pass