Skip to content
AVOA WITH RL
AVOA WITH RL
pip install stable-baselines3[extra] gym numpy matplotlib# hybrid_avoa_ppo_ucp.py
import random
import numpy as np
import matplotlib.pyplot as plt
import gymnasium as gym
from gymnasium import spaces
from stable_baselines3 import PPO
from stable_baselines3.common.env_checker import check_env
import warnings
warnings.filterwarnings("ignore", category=UserWarning)
# ---------------------------
# 0. Problem data (IEEE-39 10-unit example)
# ---------------------------
num_units = 10
num_hours = 24
initial_status_hours = [8,8,8,-8,-8,-8,8,-8,-8,-8]
Pmin = np.array([150,150,20,20,25,20,25,10,10,10], dtype=float)
Pmax = np.array([455,455,130,130,162,80,85,55,55,55], dtype=float)
a_coeff = np.array([0.00048,0.00031,0.00200,0.00211,0.00398,0.00712,0.00079,0.00413,0.00222,0.00173])
b_coeff = np.array([16.19,17.26,16.60,16.50,19.70,22.26,27.74,25.92,27.27,27.79])
c_coeff = np.array([1000,970,700,680,450,370,480,660,665,670])
SU_hot = np.array([4500,5000,550,560,900,170,260,30,30,30], dtype=float)
SU_cold = np.array([9000,10000,1100,1120,1800,340,520,60,60,60], dtype=float)
T_cold = np.array([5]*num_units, dtype=int)
T_MU = np.array([8,8,5,5,6,3,3,1,1,1], dtype=int)
T_MD = np.array([8,8,5,5,6,3,3,1,1,1], dtype=int)
load_demand = np.array([700,750,850,950,1000,1100,1150,1200,
1300,1400,1450,1500,1400,1300,1200,1050,
1000,1100,1200,1400,1300,1100,900,800], dtype=float)
reserve_pct = 0.10
# ---------------------------
# 1. Lambda iteration (ELD)
# ---------------------------
def lambda_dispatch(schedule):
# schedule: np.array (num_units, num_hours) binary
dispatch = np.zeros_like(schedule, dtype=float)
for t in range(num_hours):
eff_load = load_demand[t] * (1 + reserve_pct)
on_units = np.where(schedule[:,t] == 1)[0]
if on_units.size == 0:
continue
lam_lo, lam_hi = 0.0, 1e5
gen = np.zeros(on_units.size)
for _ in range(200):
lam = 0.5*(lam_lo+lam_hi)
for idx, i in enumerate(on_units):
Pi = (lam - b_coeff[i])/(2*a_coeff[i])
gen[idx] = np.clip(Pi, Pmin[i], Pmax[i])
total = gen.sum()
if abs(total - eff_load) < 1e-2:
break
if total > eff_load:
lam_hi = lam
else:
lam_lo = lam
for idx, i in enumerate(on_units):
dispatch[i,t] = gen[idx]
return dispatch
# ---------------------------
# 2. Total cost function (fitness)
# ---------------------------
def calculate_total_cost(schedule):
# schedule: np.array (num_units, num_hours) binary
status = np.array([1 if h>0 else 0 for h in initial_status_hours], dtype=int)
T_on = np.array([h if h>0 else 0 for h in initial_status_hours], dtype=int)
T_off = np.array([-h if h<0 else 0 for h in initial_status_hours], dtype=int)
cost = 0.0
dispatch = lambda_dispatch(schedule)
for t in range(num_hours):
eff_load = load_demand[t] * (1 + reserve_pct)
total_gen = dispatch[:,t].sum()
if abs(total_gen - eff_load) > 1e-2:
cost += 1e5 # high penalty (adjustable)
for i in range(num_units):
u = int(schedule[i,t])
p = dispatch[i,t]
if u == 1:
cost += a_coeff[i]*p*p + b_coeff[i]*p + c_coeff[i]
# startup
if status[i] == 0:
cost += SU_hot[i] if T_off[i] <= T_cold[i] else SU_cold[i]
T_on[i] = T_on[i] + 1 if status[i]==1 else 1
T_off[i] = 0
else:
if status[i] == 1 and T_on[i] < T_MU[i]:
cost += 1e4 # min-up penalty (adjustable)
T_off[i] = T_off[i] + 1 if status[i]==0 else 1
T_on[i] = 0
status[i] = u
return cost
# ---------------------------
# 3. Compact AVOA (baseline explorer)
# - This is a simplified AVOA to produce a candidate best schedule.
# ---------------------------
def greedy_seed():
"""Greedy seed: commit cheapest units first to cover effective load each hour."""
schedule = np.zeros((num_units, num_hours), dtype=int)
rank = np.argsort(b_coeff) # simple ranking by b_coeff
for t in range(num_hours):
eff = load_demand[t] * (1 + reserve_pct)
total = 0.0
for i in rank:
if total < eff:
schedule[i,t] = 1
total += Pmin[i]
else:
schedule[i,t] = 0
return schedule
def generate_population(pop_size=20):
pop = []
pop.append(greedy_seed())
for _ in range(pop_size-1):
cand = np.random.randint(0,2,(num_units,num_hours))
# correct trivial min-up/down violations via simple enforcement
pop.append(cand)
return pop
def avoa_search(pop_size=20, max_iter=100):
pop = generate_population(pop_size)
fitness = [calculate_total_cost(p) for p in pop]
best_idx = int(np.argmin(fitness))
best = pop[best_idx].copy()
best_cost = fitness[best_idx]
fitness_history = [best_cost]
for it in range(max_iter):
# simple 'AVOA-like' update: create children by flipping bits near best
children = []
for k in range(pop_size):
parent = pop[k].copy()
# flip random block: improves exploration/exploitation tradeoff
flips = np.random.binomial(1, 0.02, parent.shape).astype(bool)
child = parent.copy()
child[flips] = 1 - child[flips]
children.append(child)
pop = children
fitness = [calculate_total_cost(p) for p in pop]
curr_best_idx = int(np.argmin(fitness))
if fitness[curr_best_idx] < best_cost:
best_cost = fitness[curr_best_idx]
best = pop[curr_best_idx].copy()
fitness_history.append(best_cost)
if (it+1) % 10 == 0:
print(f"[AVOA] it {it+1}/{max_iter} best_cost = {best_cost:.2f}")
return best, best_cost, fitness_history
# ---------------------------
# 4. Gym Environment for PPO refinement
# - The agent will receive a partial observation and decide the ON/OFF vector for each hour sequentially.
# ---------------------------
class UCPRefineEnv(gym.Env):
"""
Gym Env to refine a full schedule produced by AVOA.
The agent sets the commitment for one hour at a time (action is MultiBinary(num_units)).
Episode length: num_hours. Final reward = negative total cost of resulting full schedule.
"""
metadata = {'render.modes': ['human']}
def __init__(self, seed_schedule=None):
super(UCPRefineEnv, self).__init__()
self.num_units = num_units
self.num_hours = num_hours
# action: binary vector of size num_units
self.action_space = spaces.MultiBinary(self.num_units)
# observation: current hour (scalar), current unit statuses, T_on, T_off, next load (normalized)
# we'll flatten to a 1D vector
obs_dim = 1 + self.num_units + self.num_units + self.num_units + 3 # hour + status + Ton + Toff + next 3 loads
self.observation_space = spaces.Box(low=-1000.0, high=100000.0, shape=(obs_dim,), dtype=np.float32)
# schedule to refine; if None, start with greedy seed
self.seed_schedule = seed_schedule.copy() if seed_schedule is not None else greedy_seed()
self.reset()
def reset(self):
# initialize state trackers from initial_status_hours
self.status = np.array([1 if h>0 else 0 for h in initial_status_hours], dtype=int)
self.T_on = np.array([h if h>0 else 0 for h in initial_status_hours], dtype=int)
self.T_off = np.array([-h if h<0 else 0 for h in initial_status_hours], dtype=int)
# working schedule starts from the seed
self.working = self.seed_schedule.copy()
self.t = 0
return self._get_obs()
def _get_obs(self):
# include next up to 3 loads normalized (pad with zeros at end)
next_loads = []
for k in range(3):
idx = self.t + k
if idx < num_hours:
next_loads.append(load_demand[idx] / max(load_demand))
else:
next_loads.append(0.0)
obs = np.concatenate([
np.array([self.t], dtype=float),
self.status.astype(float),
self.T_on.astype(float),
self.T_off.astype(float),
np.array(next_loads, dtype=float)
])
return obs.astype(np.float32)
def step(self, action):
# action: binary array length num_units -> set commitments for hour t
action = np.array(action).astype(int).reshape(self.num_units)
# enforce min-up/min-down in a simple manner: if action attempts illegal switch, keep previous
for i in range(self.num_units):
if action[i] == 1 and self.status[i] == 0 and self.T_off[i] < T_MD[i]:
action[i] = 0
if action[i] == 0 and self.status[i] == 1 and self.T_on[i] < T_MU[i]:
action[i] = 1
# write into working schedule
self.working[:, self.t] = action
# update trackers
for i in range(self.num_units):
if action[i] == 1:
self.T_on[i] = self.T_on[i] + 1 if self.status[i]==1 else 1
self.T_off[i] = 0
else:
self.T_off[i] = self.T_off[i] + 1 if self.status[i]==0 else 1
self.T_on[i] = 0
self.status[i] = action[i]
self.t += 1
done = (self.t >= self.num_hours)
reward = 0.0
info = {}
if done:
# negative cost as reward
total_cost = calculate_total_cost(self.working)
reward = -total_cost
info['total_cost'] = total_cost
return self._get_obs(), reward, done, info
def render(self, mode='human'):
print("t:", self.t)
# ---------------------------
# 5. Hybrid flow: Run AVOA, then train PPO to refine schedule, then refine & compare.
# ---------------------------
def hybrid_run():
# 1) Run AVOA to get candidate best_schedule
print("Running AVOA baseline search...")
best_schedule, best_cost, hist = avoa_search(pop_size=30, max_iter=120) # tune these
print(f"AVOA best cost: {best_cost:.2f}")
# plot AVOA history
plt.figure(figsize=(7,3))
plt.plot(hist, '-o')
plt.xlabel('AVOA iterations')
plt.ylabel('Best cost')
plt.title('AVOA Convergence (baseline)')
plt.grid(True)
plt.show()
# 2) Build refine environment seeded with AVOA schedule
env = UCPRefineEnv(seed_schedule=best_schedule)
check_env(env) # optional sanity check
# 3) Train PPO agent
print("Training PPO agent to refine schedule...")
model = PPO("MlpPolicy", env, verbose=1)
# small training to illustrate; increase total_timesteps for production
model.learn(total_timesteps=20000)
model.save("ppo_ucp_refine")
# 4) Use trained agent to refine schedule
print("Refining schedule using trained PPO agent...")
env_eval = UCPRefineEnv(seed_schedule=best_schedule)
obs = env_eval.reset()
done = False
while not done:
action, _ = model.predict(obs, deterministic=True)
obs, reward, done, info = env_eval.step(action)
refined = env_eval.working
refined_cost = calculate_total_cost(refined)
print(f"AVOA cost = {best_cost:.2f}, refined cost = {refined_cost:.2f}")
if refined_cost < best_cost:
print("Refined schedule improved cost. Accepting refined schedule.")
final_schedule = refined
final_cost = refined_cost
else:
print("Refined schedule did NOT improve. Keeping AVOA schedule.")
final_schedule = best_schedule
final_cost = best_cost
# 5) Display result summaries
print("\nFinal cost:", final_cost)
# simple visualization: unit commitments heatmap
plt.figure(figsize=(10,4))
plt.imshow(final_schedule, aspect='auto', cmap='Greys', interpolation='nearest')
plt.xlabel('Hour')
plt.ylabel('Unit')
plt.title('Final Unit Commitment (1=ON,0=OFF)')
plt.colorbar(label='ON/OFF')
plt.show()
return best_schedule, best_cost, refined, refined_cost, final_schedule, final_cost
# ---------------------------
# 6. run
# ---------------------------
if __name__ == "__main__":
best_avoa, cost_avoa, refined_sched, cost_refined, final_sched, final_cost = hybrid_run()