Skip to content

AVOA WITH RL

pip install stable-baselines3[extra] gym numpy matplotlib
# hybrid_avoa_ppo_ucp.py
import random
import numpy as np
import matplotlib.pyplot as plt
import gymnasium as gym
from gymnasium import spaces
from stable_baselines3 import PPO
from stable_baselines3.common.env_checker import check_env
import warnings
warnings.filterwarnings("ignore", category=UserWarning)

# ---------------------------
# 0. Problem data (IEEE-39 10-unit example)
# ---------------------------
num_units = 10
num_hours = 24

initial_status_hours = [8,8,8,-8,-8,-8,8,-8,-8,-8]

Pmin = np.array([150,150,20,20,25,20,25,10,10,10], dtype=float)
Pmax = np.array([455,455,130,130,162,80,85,55,55,55], dtype=float)

a_coeff = np.array([0.00048,0.00031,0.00200,0.00211,0.00398,0.00712,0.00079,0.00413,0.00222,0.00173])
b_coeff = np.array([16.19,17.26,16.60,16.50,19.70,22.26,27.74,25.92,27.27,27.79])
c_coeff = np.array([1000,970,700,680,450,370,480,660,665,670])

SU_hot  = np.array([4500,5000,550,560,900,170,260,30,30,30], dtype=float)
SU_cold = np.array([9000,10000,1100,1120,1800,340,520,60,60,60], dtype=float)
T_cold  = np.array([5]*num_units, dtype=int)

T_MU = np.array([8,8,5,5,6,3,3,1,1,1], dtype=int)
T_MD = np.array([8,8,5,5,6,3,3,1,1,1], dtype=int)

load_demand = np.array([700,750,850,950,1000,1100,1150,1200,
                        1300,1400,1450,1500,1400,1300,1200,1050,
                        1000,1100,1200,1400,1300,1100,900,800], dtype=float)

reserve_pct = 0.10

# ---------------------------
# 1. Lambda iteration (ELD)
# ---------------------------
def lambda_dispatch(schedule):
    # schedule: np.array (num_units, num_hours) binary
    dispatch = np.zeros_like(schedule, dtype=float)
    for t in range(num_hours):
        eff_load = load_demand[t] * (1 + reserve_pct)
        on_units = np.where(schedule[:,t] == 1)[0]
        if on_units.size == 0:
            continue
        lam_lo, lam_hi = 0.0, 1e5
        gen = np.zeros(on_units.size)
        for _ in range(200):
            lam = 0.5*(lam_lo+lam_hi)
            for idx, i in enumerate(on_units):
                Pi = (lam - b_coeff[i])/(2*a_coeff[i])
                gen[idx] = np.clip(Pi, Pmin[i], Pmax[i])
            total = gen.sum()
            if abs(total - eff_load) < 1e-2:
                break
            if total > eff_load:
                lam_hi = lam
            else:
                lam_lo = lam
        for idx, i in enumerate(on_units):
            dispatch[i,t] = gen[idx]
    return dispatch

# ---------------------------
# 2. Total cost function (fitness)
# ---------------------------
def calculate_total_cost(schedule):
    # schedule: np.array (num_units, num_hours) binary
    status = np.array([1 if h>0 else 0 for h in initial_status_hours], dtype=int)
    T_on  = np.array([h if h>0 else 0 for h in initial_status_hours], dtype=int)
    T_off = np.array([-h if h<0 else 0 for h in initial_status_hours], dtype=int)
    cost = 0.0
    dispatch = lambda_dispatch(schedule)

    for t in range(num_hours):
        eff_load = load_demand[t] * (1 + reserve_pct)
        total_gen = dispatch[:,t].sum()
        if abs(total_gen - eff_load) > 1e-2:
            cost += 1e5  # high penalty (adjustable)

        for i in range(num_units):
            u = int(schedule[i,t])
            p = dispatch[i,t]
            if u == 1:
                cost += a_coeff[i]*p*p + b_coeff[i]*p + c_coeff[i]
                # startup
                if status[i] == 0:
                    cost += SU_hot[i] if T_off[i] <= T_cold[i] else SU_cold[i]
                T_on[i] = T_on[i] + 1 if status[i]==1 else 1
                T_off[i] = 0
            else:
                if status[i] == 1 and T_on[i] < T_MU[i]:
                    cost += 1e4  # min-up penalty (adjustable)
                T_off[i] = T_off[i] + 1 if status[i]==0 else 1
                T_on[i] = 0
            status[i] = u
    return cost

# ---------------------------
# 3. Compact AVOA (baseline explorer)
#    - This is a simplified AVOA to produce a candidate best schedule.
# ---------------------------
def greedy_seed():
    """Greedy seed: commit cheapest units first to cover effective load each hour."""
    schedule = np.zeros((num_units, num_hours), dtype=int)
    rank = np.argsort(b_coeff)  # simple ranking by b_coeff
    for t in range(num_hours):
        eff = load_demand[t] * (1 + reserve_pct)
        total = 0.0
        for i in rank:
            if total < eff:
                schedule[i,t] = 1
                total += Pmin[i]
            else:
                schedule[i,t] = 0
    return schedule

def generate_population(pop_size=20):
    pop = []
    pop.append(greedy_seed())
    for _ in range(pop_size-1):
        cand = np.random.randint(0,2,(num_units,num_hours))
        # correct trivial min-up/down violations via simple enforcement
        pop.append(cand)
    return pop

def avoa_search(pop_size=20, max_iter=100):
    pop = generate_population(pop_size)
    fitness = [calculate_total_cost(p) for p in pop]
    best_idx = int(np.argmin(fitness))
    best = pop[best_idx].copy()
    best_cost = fitness[best_idx]
    fitness_history = [best_cost]

    for it in range(max_iter):
        # simple 'AVOA-like' update: create children by flipping bits near best
        children = []
        for k in range(pop_size):
            parent = pop[k].copy()
            # flip random block: improves exploration/exploitation tradeoff
            flips = np.random.binomial(1, 0.02, parent.shape).astype(bool)
            child = parent.copy()
            child[flips] = 1 - child[flips]
            children.append(child)
        pop = children
        fitness = [calculate_total_cost(p) for p in pop]
        curr_best_idx = int(np.argmin(fitness))
        if fitness[curr_best_idx] < best_cost:
            best_cost = fitness[curr_best_idx]
            best = pop[curr_best_idx].copy()
        fitness_history.append(best_cost)
        if (it+1) % 10 == 0:
            print(f"[AVOA] it {it+1}/{max_iter} best_cost = {best_cost:.2f}")

    return best, best_cost, fitness_history

# ---------------------------
# 4. Gym Environment for PPO refinement
#    - The agent will receive a partial observation and decide the ON/OFF vector for each hour sequentially.
# ---------------------------
class UCPRefineEnv(gym.Env):
    """
    Gym Env to refine a full schedule produced by AVOA.
    The agent sets the commitment for one hour at a time (action is MultiBinary(num_units)).
    Episode length: num_hours. Final reward = negative total cost of resulting full schedule.
    """
    metadata = {'render.modes': ['human']}

    def __init__(self, seed_schedule=None):
        super(UCPRefineEnv, self).__init__()
        self.num_units = num_units
        self.num_hours = num_hours
        # action: binary vector of size num_units
        self.action_space = spaces.MultiBinary(self.num_units)
        # observation: current hour (scalar), current unit statuses, T_on, T_off, next load (normalized)
        # we'll flatten to a 1D vector
        obs_dim = 1 + self.num_units + self.num_units + self.num_units + 3  # hour + status + Ton + Toff + next 3 loads
        self.observation_space = spaces.Box(low=-1000.0, high=100000.0, shape=(obs_dim,), dtype=np.float32)

        # schedule to refine; if None, start with greedy seed
        self.seed_schedule = seed_schedule.copy() if seed_schedule is not None else greedy_seed()
        self.reset()

    def reset(self):
        # initialize state trackers from initial_status_hours
        self.status = np.array([1 if h>0 else 0 for h in initial_status_hours], dtype=int)
        self.T_on = np.array([h if h>0 else 0 for h in initial_status_hours], dtype=int)
        self.T_off = np.array([-h if h<0 else 0 for h in initial_status_hours], dtype=int)
        # working schedule starts from the seed
        self.working = self.seed_schedule.copy()
        self.t = 0
        return self._get_obs()

    def _get_obs(self):
        # include next up to 3 loads normalized (pad with zeros at end)
        next_loads = []
        for k in range(3):
            idx = self.t + k
            if idx < num_hours:
                next_loads.append(load_demand[idx] / max(load_demand))
            else:
                next_loads.append(0.0)
        obs = np.concatenate([
            np.array([self.t], dtype=float),
            self.status.astype(float),
            self.T_on.astype(float),
            self.T_off.astype(float),
            np.array(next_loads, dtype=float)
        ])
        return obs.astype(np.float32)

    def step(self, action):
        # action: binary array length num_units -> set commitments for hour t
        action = np.array(action).astype(int).reshape(self.num_units)
        # enforce min-up/min-down in a simple manner: if action attempts illegal switch, keep previous
        for i in range(self.num_units):
            if action[i] == 1 and self.status[i] == 0 and self.T_off[i] < T_MD[i]:
                action[i] = 0
            if action[i] == 0 and self.status[i] == 1 and self.T_on[i] < T_MU[i]:
                action[i] = 1
        # write into working schedule
        self.working[:, self.t] = action
        # update trackers
        for i in range(self.num_units):
            if action[i] == 1:
                self.T_on[i] = self.T_on[i] + 1 if self.status[i]==1 else 1
                self.T_off[i] = 0
            else:
                self.T_off[i] = self.T_off[i] + 1 if self.status[i]==0 else 1
                self.T_on[i] = 0
            self.status[i] = action[i]

        self.t += 1
        done = (self.t >= self.num_hours)
        reward = 0.0
        info = {}
        if done:
            # negative cost as reward
            total_cost = calculate_total_cost(self.working)
            reward = -total_cost
            info['total_cost'] = total_cost
        return self._get_obs(), reward, done, info

    def render(self, mode='human'):
        print("t:", self.t)

# ---------------------------
# 5. Hybrid flow: Run AVOA, then train PPO to refine schedule, then refine & compare.
# ---------------------------
def hybrid_run():
    # 1) Run AVOA to get candidate best_schedule
    print("Running AVOA baseline search...")
    best_schedule, best_cost, hist = avoa_search(pop_size=30, max_iter=120)  # tune these
    print(f"AVOA best cost: {best_cost:.2f}")

    # plot AVOA history
    plt.figure(figsize=(7,3))
    plt.plot(hist, '-o')
    plt.xlabel('AVOA iterations')
    plt.ylabel('Best cost')
    plt.title('AVOA Convergence (baseline)')
    plt.grid(True)
    plt.show()

    # 2) Build refine environment seeded with AVOA schedule
    env = UCPRefineEnv(seed_schedule=best_schedule)
    check_env(env)  # optional sanity check

    # 3) Train PPO agent
    print("Training PPO agent to refine schedule...")
    model = PPO("MlpPolicy", env, verbose=1)
    # small training to illustrate; increase total_timesteps for production
    model.learn(total_timesteps=20000)
    model.save("ppo_ucp_refine")

    # 4) Use trained agent to refine schedule
    print("Refining schedule using trained PPO agent...")
    env_eval = UCPRefineEnv(seed_schedule=best_schedule)
    obs = env_eval.reset()
    done = False
    while not done:
        action, _ = model.predict(obs, deterministic=True)
        obs, reward, done, info = env_eval.step(action)
    refined = env_eval.working
    refined_cost = calculate_total_cost(refined)

    print(f"AVOA cost = {best_cost:.2f}, refined cost = {refined_cost:.2f}")
    if refined_cost < best_cost:
        print("Refined schedule improved cost. Accepting refined schedule.")
        final_schedule = refined
        final_cost = refined_cost
    else:
        print("Refined schedule did NOT improve. Keeping AVOA schedule.")
        final_schedule = best_schedule
        final_cost = best_cost

    # 5) Display result summaries
    print("\nFinal cost:", final_cost)
    # simple visualization: unit commitments heatmap
    plt.figure(figsize=(10,4))
    plt.imshow(final_schedule, aspect='auto', cmap='Greys', interpolation='nearest')
    plt.xlabel('Hour')
    plt.ylabel('Unit')
    plt.title('Final Unit Commitment (1=ON,0=OFF)')
    plt.colorbar(label='ON/OFF')
    plt.show()

    return best_schedule, best_cost, refined, refined_cost, final_schedule, final_cost

# ---------------------------
# 6. run
# ---------------------------
if __name__ == "__main__":
    best_avoa, cost_avoa, refined_sched, cost_refined, final_sched, final_cost = hybrid_run()