Skip to content
A revisioned approach on the CAN bus analysis
Loading modules
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import os
import glob
import math
import time
from datetime import timedelta
#start_time = time.monotonic()
#end_time = time.monotonic()
#print(timedelta(seconds=end_time - start_time))
Start point
We have a logfile of CAN bus communication, received through a dongle, stored in ASC-file. Cleaning and reformatting the data cannot be done by a simple import statement and the use of the delimiter function.
In the first step we will:
- clear the data of unnecessary information
- remove superfluous information
- determine whether the DLC for an ID is unique
- create subsets, based on ID, for ID's with unique DLC
Step 1: Data import and preparation
# Preformat the ASC-file and remove invalid rows
asc_logfile = 'golf_log.asc'
# Function for cleaning the original ASC-file
def clean_file(asc_filepath):
"""
Converts the ASC-file in the correct way to create a DataFrame.
It also drops rows without a numeric DLC.
"""
log = pd.read_csv(
asc_filepath,
header=None
)
log = log.iloc[:,0].str.split(
n=14, # Based on a visual inspection of the logfile
expand=True # Columns instead of series
)\
.iloc[:, :14] # Keep only the first 14 columns
columns = [ # List of new column names
'timestamp',
'channel',
'id',
'rx_tx',
'een_d',
'dlc',
'D0',
'D1',
'D2',
'D3',
'D4',
'D5',
'D6',
'D7'
]
log.columns = columns
log.drop(
['channel', 'rx_tx', 'een_d'],
axis=1,
inplace=True)
# Create mask to identify rows where the dlc column contains non-integer
mask = pd.to_numeric(
log['dlc'], # Takes DLC column
errors='coerce' # Forces it into numeric value
)\
.isna() # Creates a boolean series for when DLC is NaN
# Select the rows where the dlc column is Not A Number (NaN)
dropped_rows = log[mask]
# Remove the selected rows from the original DataFrame
log = log[~mask]
log.reset_index(
drop=True,
inplace=True
)
return log, dropped_rows
log, dropped_rows = clean_file(asc_logfile)
After cleaning the file we can summon:
- log, which is the cleaned data
- dropped_rows, to inspect the rows that were dropped from the data
Run cancelled
print(dropped_rows)
print(log)
Next we store all the sublogs in a dictionary that can be called upon later in te process.
We also want to see a list of identifiers and their DLC.
# Split logfile in seperate logs based on unique identifier
# Store all ID's in dictionary as keys, the sublog is the value
# Create an id_dlc_list for overview of ID's and their DLC's
def create_sublogs(cleaned_logfile):
"""
Creates dictionary with id as key and sublog as value.
!! Pay attention, as of now the identifiers are lowercase
"""
group_dict = {}
grouped = cleaned_logfile.groupby('id')
for name, group in grouped:
group = group.reset_index(drop=True)
group_dict[name.lower()] = group
print(f'The are {len(group_dict)} unique identifiers in the log')
return group_dict
def overview(dictionary):
"""
returns a DataFrame containing all found ID's
and the corresponding DLC's.
"""
dlc_list = []
id_list = list(dictionary.keys())
for key in id_list:
dlc = list(dictionary[key]['dlc'].unique())
dlc_list.append(dlc)
data = [id_list, dlc_list]
id_dlc_df = pd.DataFrame(
data,
index=['id', 'dlc']
).transpose()
return id_dlc_df
sublog_dictionary = create_sublogs(log)
overview_df = overview(sublog_dictionary)
Summary Step 1
As of now we have the following variables and dictionaries:
log
: the original ASC-file as DataFrame and cleaned from unimportant rowsdropped_rows
: a DataFrame for inspecting the dropped rows. A second check whether no faults were made.sublog_dictionary
: a dictionary where the keys are the identifiers and the values are all the rows/messages bearing the assigned ID (the ID-subset in other words).overview_df
: A DataFrame listing all found id's and their corresponding dlc's.
Step 2: Calculating the parameters (BFR and Magnitude)
State whether the input file is HEX or DEC
# Is the log in HEX or DEC?
dataformat = 'hex'
Binary dictionary
Creation of second dictionary but now with binary information. ID's with more than 1 unique DLC are dropped.
# Convert HEX to BIN
def hex_to_bin(dictionary):
"""
Converts HEX values into BIN equivalent.
The creation of a new (copied) dictionary is required to not alter the original sublog_dictionary.
First, only handle ID for which is only one unique DLC.
Second, convert the HEX values to BIN values.
Put those values back in the copied dictionary.
"""
bin_dictionary = {key: value[:] for key, value in dictionary.items()} # See deepcopy() in literature
keys_to_drop = []
for key, value in bin_dictionary.items():
if value['dlc'].nunique() != 1:
print(f'id {key} skipped because unique DLC is not 1')
keys_to_drop.append(key)
else:
dlc = int(value['dlc'][0])
num_bits = dlc * 8
for i in range(num_bits):
byte_index = i // 8
col_name = f'bit_{i:02}'
value[col_name] = value[f'D{byte_index}'].apply(
lambda x, i=i: bin(int(str(x), 16))[2:].zfill(8)[i%8]
)
value.drop(
columns=[f'D{i}' for i in range(dlc)],
inplace=True
)
bin_dictionary[key] = value
for key in keys_to_drop:
del bin_dictionary[key]
return bin_dictionary
if dataformat == 'hex':
bin_dictionary = hex_to_bin(sublog_dictionary)
if dataformat == 'dec':
pass