"""
CliMaPan-Lab: Climate-Pandemic Economic Modeling Laboratory
Main Economic Model Implementation
This module contains the core EconModel class that orchestrates the agent-based
simulation with climate and pandemic dynamics.
"""
import copy
import math
from collections import OrderedDict
from datetime import date, timedelta
import ambr as am
import numpy as np
from .banks.Bank import Bank
from .climate import Climate
from .consumers.Consumer import Consumer
from .firms.BrownEnergyFirm import BrownEnergyFirm
from .firms.CapitalGoodsFirm import CapitalGoodsFirm
from .firms.ConsumerGoodsFirm import ConsumerGoodsFirm
from .firms.GreenEnergyFirm import GreenEnergyFirm
from .governments.Goverment import Government
from .utils import _merge_edgelist, gini, listToArray, lognormal, normal
# ============================================================================
# EconModel
# ============================================================================
# Purpose:
# Quick "follow-through" of the full model loop so a reader can
# understand where each subsystem fits (economy, COVID, climate,
# finance, government).
#
# Time scale:
# - One model step = 1 day.
# - "Monthly" blocks run on the day before rollover to day 1 (i.e., when tomorrow is the 1st).
#
# Main Flow:
# 0) setup(): Initialize agents and state
# 1) step(): Daily execution with monthly economic cycles
# 2) update(): Record metrics for analysis
# 3) Helper routines: Markets, COVID, climate, policy
# ============================================================================
[docs]
class EconModel(am.Model):
[docs]
def setup(self):
"""Initialize the agents and network of the model."""
# ----------------------------------------
# Global / simulation-wide state
# ----------------------------------------
# Initiate variables
np.random.seed(self.p.seed)
# --- Population composition counters ---
self.num_worker = 0
self.num_owner = 0
# --- Prices / inflation / consumption variance ---
self.fossil_fuel_price = self.p.fossil_fuel_price
# self.expectedInflationRate = self.p.bankCredibility * self.p.targetInflation
self.consumption_var = self.p.consumption_var
# self.expectedInflationRateList = []
self.averagePriceList = [0.5]
# --- Firm failure / bankruptcy bookkeeping ---
self.numCSFirmBankrupt = 0
self.numCPFirmBankrupt = 0
self.bankrupt_count = 0
self.bankrupt_list = []
self.bankrupt_total_count = 0
# --- Banking & taxes ---
self.bankIL = self.p.bankIL
self.inflationRate = 0
self.totalCarbonTaxes = 0
self.totalTaxes = 0
self.inflationRateList = []
# --- Pandemic policy state ---
self.covidState = False
self.lockdown = False
self.lockdown_scale = self.p.lock_down_production_utilization
# --- Fiscal policy triggers ---
self.fiscalDate = np.inf
self.fiscal_count = 0
# --- Epidemiological aggregates (daily resolution) ---
self.num_infection = 0
self.num_death = 0
self.num_susceptible = 0
self.num_exposed = 0
self.num_mild = 0
self.num_severe = 0
self.num_critical = 0
self.num_recover = 0
# self.covid_infect = 0
self.covid_death = 0
self.covid_new = 0
# --- Accounting aggregates ---
self.total_good = 0
self.sale = 0
self.ksale = 0
self.cssale = 0
self.expenditure = 0
self.ue_gov = 0
# --- Calendrical state ---
self.month_no = 0
self.demand_fluctuation = 0
# --- Initial endowments & policy multipliers ---
self.owner_endownment = self.p.owner_endownment
self.worker_endownment = self.p.worker_endownment
self.alpha_h = self.p.alpha_h
self.alpha_f = self.p.alpha_f
# ----------------------------------------
# Agent creation and attributes
# ----------------------------------------
## Initiate consumer agents
self.consumer_agents = am.AgentList(self, self.p.c_agents, Consumer)
# Assign age groups with small random deviations
# Vectorized age group assignment
rands = np.random.normal(0, 0.1, len(self.consumer_agents))
self.consumer_agents.select(rands < -0.1).call("setAgeGroup", "young")
self.consumer_agents.select(rands > 0.15).call("setAgeGroup", "elderly")
self.consumer_agents.select((rands >= -0.1) & (rands <= 0.15)).call(
"setAgeGroup", "working"
)
# Assign working-age consumers into economic roles
count = 0
self.workingAgeConsumers = [
idx
for idx in range(len(self.consumer_agents))
if self.consumer_agents[idx].getAgeGroup() == "working"
]
for i in self.workingAgeConsumers:
if count < self.p.capitalists:
# Capitalists (general owners of CS/CP firms)
self.consumer_agents[i].setConsumerType("capitalists")
self.consumer_agents[i].owner = self.consumer_agents[
i
].consumerType not in ["workers", None]
self.consumer_agents[i].update_deposit(self.owner_endownment)
self.num_owner += 1
elif count < self.p.capitalists + self.p.green_energy_owners:
# Owners of GREEN energy sector
self.consumer_agents[i].setConsumerType("green_energy_owners")
self.consumer_agents[i].owner = self.consumer_agents[
i
].consumerType not in ["workers", None]
self.consumer_agents[i].update_deposit(self.owner_endownment)
self.num_owner += 1
elif (
count
< self.p.capitalists
+ self.p.green_energy_owners
+ self.p.brown_energy_owners
):
# Owners of BROWN energy sector
self.consumer_agents[i].setConsumerType("brown_energy_owners")
self.consumer_agents[i].owner = self.consumer_agents[
i
].consumerType not in ["workers", None]
self.consumer_agents[i].update_deposit(self.owner_endownment)
self.num_owner += 1
else:
# Residual working-age consumers become workers
self.consumer_agents[i].setConsumerType("workers")
self.num_worker += 1
self.consumer_agents[i].update_deposit(self.worker_endownment)
count += 1
# Alive (non-dead) population view
self.aliveConsumers = self.consumer_agents.select(
self.consumer_agents.getCovidStateAttr("state") != "dead"
)
self.aliveConsumers = self.aliveConsumers.select(
self.aliveConsumers.isDead() != True
)
## Initiate bank agents
self.bank_agents = am.AgentList(self, 1, Bank)
## Initiate Government agents
self.government_agents = am.AgentList(self, self.p.g_agents, Government)
## Initiate firm agents
### Consumption goods firms (CS)
self.csfirm_agents = am.AgentList(self, self.p.csf_agents, ConsumerGoodsFirm)
# Ensure we have at least one of each energy type if we have multiple firms
if len(self.csfirm_agents) >= 2:
# Assign first firm to brown, second to green, rest randomly
self.csfirm_agents[0].useEnergyType("brown")
self.csfirm_agents[0].brown_firm = True
self.csfirm_agents[1].useEnergyType("green")
self.csfirm_agents[1].brown_firm = False
# Vectorized assignment for remainder
rest_agents = am.AgentList(self, self.csfirm_agents[2:])
if len(rest_agents) > 0:
probs = np.random.uniform(0, 1, len(rest_agents))
brown_mask = probs < 0.5
brown_agents = rest_agents.select(brown_mask)
brown_agents.call("useEnergyType", "brown")
for agent in brown_agents:
agent.brown_firm = True # Manual attribute update
green_agents = rest_agents.select(~brown_mask)
green_agents.call("useEnergyType", "green")
for agent in green_agents:
agent.brown_firm = False
else:
# Single firm random assignment
if np.random.uniform(0, 1) < 0.5:
self.csfirm_agents.call("useEnergyType", "brown")
for agent in self.csfirm_agents:
agent.brown_firm = True
else:
self.csfirm_agents.call("useEnergyType", "green")
for agent in self.csfirm_agents:
agent.brown_firm = False
### Capital goods firms (CP)
self.cpfirm_agents = am.AgentList(self, self.p.cpf_agents, CapitalGoodsFirm)
# Ensure we have at least one of each energy type if we have multiple firms
# Ensure we have at least one of each energy type if we have multiple firms
if len(self.cpfirm_agents) >= 2:
# Assign first firm to brown, second to green, rest randomly
self.cpfirm_agents[0].useEnergyType("brown")
self.cpfirm_agents[0].capital = 5000
self.cpfirm_agents[0].brown_firm = True
self.cpfirm_agents[1].useEnergyType("green")
self.cpfirm_agents[1].capital = 4200
self.cpfirm_agents[1].brown_firm = False
# Vectorized assignment for remainder
rest_agents = am.AgentList(self, self.cpfirm_agents[2:])
if len(rest_agents) > 0:
probs = np.random.beta(3, 7, len(rest_agents))
brown_mask = probs < 0.5
brown_agents = rest_agents.select(brown_mask)
brown_agents.call("useEnergyType", "brown")
for agent in brown_agents:
agent.capital = 5000
agent.brown_firm = True
green_agents = rest_agents.select(~brown_mask)
green_agents.call("useEnergyType", "green")
for agent in green_agents:
agent.capital = 4200
agent.brown_firm = False
else:
# Single firm random assignment
if np.random.beta(3, 7) < 0.5:
self.cpfirm_agents.call("useEnergyType", "brown")
for agent in self.cpfirm_agents:
agent.capital = 5000
agent.brown_firm = True
else:
self.cpfirm_agents.call("useEnergyType", "green")
for agent in self.cpfirm_agents:
agent.capital = 4200
agent.brown_firm = False
### Energy firms
self.greenEFirm = am.AgentList(self, 1, GreenEnergyFirm)
self.brownEFirm = am.AgentList(self, 1, BrownEnergyFirm)
# Cluster goods firms
self.firms = self.csfirm_agents + self.cpfirm_agents
self.totalFirms = self.firms + self.greenEFirm + self.brownEFirm
# ----------------------------------------
# National accounts initialization
# ----------------------------------------
self.GDP = 0
for firm in self.totalFirms:
self.GDP += np.sum([firm.getSoldProducts() * firm.getPrice()])
self.sale += np.sum([firm.getSoldProducts() * firm.getPrice()])
for firm in self.csfirm_agents:
self.cssale += np.sum([firm.getSoldProducts() * firm.getPrice()])
for firm in self.cpfirm_agents:
self.ksale += np.sum([firm.getSoldProducts() * firm.getPrice()])
self.GDP += np.sum([self.expenditure])
# ----------------------------------------
# Climate module (optional)
# ----------------------------------------
if self.p.climateModuleFlag:
self.climateModule = am.AgentList(self, 1, Climate)
self.climateShockMode = copy.deepcopy(self.p.climateShockMode)
self.climateModule.initGDP(self.GDP)
# ----------------------------------------
# Initial values at time 0
# ----------------------------------------
self.gini = gini(
np.array(
[(self.consumer_agents.getWage()) + (self.consumer_agents.getIncome())]
)
)
self.consumption_gini = gini(np.array(self.consumer_agents.getConsumption()))
self.fossil_fuel_price = copy.copy(self.p.fossil_fuel_price)
self.today = date.fromisoformat(self.p.start_date) + timedelta(days=self.t - 1)
self.tomorrow = self.today + timedelta(days=1)
# ----------------------------------------
# Epidemic and fiscal policy timing
# ----------------------------------------
if not self.p.covid_settings:
self.covidStartDate = np.inf
self.fiscalDate = np.inf
else:
self.covidStartDate = (
date.fromisoformat(self.p.covid_start_date)
- date.fromisoformat(self.p.start_date)
).days # 7305
if self.p.settings in ["BAIL", "INJECTION", "S2BAU", "S3MOD"]:
self.fiscalDate = self.p.fiscal_time + self.covidStartDate
else:
self.fiscalDate = np.inf
[docs]
def step(self):
"""Define the models' events per simulation step."""
self.initiate_step()
# Check end of month
# Before COVID start date: only run monthly blocks on month rollover
if self.t <= self.covidStartDate:
if int(str(self.tomorrow).split("-")[-1]) == 1:
self.month_no += 1
self.stepwise_forecast()
self.stepwise_produce()
self.stepwise_after_production()
self.stepwise_termination()
else:
# After COVID starts: daily epidemic updates + monthly economic cycles
if not int(str(self.tomorrow).split("-")[-1]) == 1:
# Within-month day: only propagate COVID
if self.num_infection != 0:
self._propagate_covid()
else:
# Month rollover: run full economic cycle
self.month_no += 1
self.stepwise_forecast()
self.stepwise_produce()
self.stepwise_after_production()
# Trigger discretionary fiscal policy for selected scenarios
if (
(self.t >= self.fiscalDate)
and self.p.covid_settings
and self.p.settings in ["BAIL", "INJECTION", "S2BAU", "S3MOD"]
and self.fiscal_count < 3
):
self.fiscal_count += 1
self._fiscal_policy()
print("implement fiscal policy")
self.stepwise_termination()
[docs]
def initiate_step(self):
"""
This internal function of the model is used to reset temporary variables or
to accumulate variable every step
"""
# Advance calendar
self.today += timedelta(days=1)
self.tomorrow += timedelta(days=1)
# Refresh alive population views
self.aliveConsumers = self.aliveConsumers.select(
self.aliveConsumers.getCovidStateAttr("state") != "dead"
)
self.aliveConsumers = self.aliveConsumers.select(
self.aliveConsumers.isDead() != True
)
self.workingAgeConsumers = [
idx
for idx in range(len(self.aliveConsumers))
if self.aliveConsumers[idx].getAgeGroup() == "working"
]
# Daily demand fluctuation
self.demand_fluctuation = normal(1, self.consumption_var)
# Reset fiscal aggregates
self.totalCarbonTaxes = 0
self.totalTaxes = 0
self.bank_agents.reset_bank()
[self.csfirm_agents[i].setTax(0) for i in range(len(self.csfirm_agents))]
[self.cpfirm_agents[i].setTax(0) for i in range(len(self.cpfirm_agents))]
# Monthly fossil fuel price growth
if int(str(self.tomorrow).split("-")[-1]) == 1:
self.fossil_fuel_price *= np.sum(1 + self.p.fossil_fuel_price_growth_rate)
# Check covid start date
if self.t == self.covidStartDate:
self.covidState = True
self._init_covid_exposure()
# Reset contact every new day
if self.p.covid_settings:
# Count epidemiological states efficiently
from collections import Counter
states = [c.covidState["state"] for c in self.aliveConsumers]
counts = Counter(states)
self.num_susceptible = counts["susceptible"]
self.num_exposed = counts["exposed"]
self.num_mild = counts["mild"]
self.num_severe = counts["severe"]
self.num_critical = counts["critical"]
self.num_recover = counts["recovered"] + counts["immunized"]
self.num_death = counts["dead"]
# Total infection (all except susceptible, recovered, immunized, dead, None)
# effectively: exposed + mild + severe + critical + infected non-sympotomatic
self.num_infection = (
counts["exposed"]
+ counts["mild"]
+ counts["severe"]
+ counts["critical"]
+ counts["infected non-sympotomatic"]
)
# Terminate or continue Covid State:
# print("covid state", self.covidState)
if self.p.covid_settings:
if (self.num_infection / self.p.c_agents) <= 0.002:
self.covidState = False
else:
self.covidState = True
[docs]
def stepwise_forecast(self):
"""
This internal function of the model is used to make forecast for some of the
agents for the upcoming time step, if neccessary
"""
# Firm forecasting demand
self._csf_forecast_demand()
self._cpf_forecast_demand()
[
self.csfirm_agents[i].calculate_input_demand()
for i in range(len(self.csfirm_agents))
]
[
self.cpfirm_agents[i].calculate_input_demand()
for i in range(len(self.cpfirm_agents))
]
if self.t > 31:
[
self.csfirm_agents[i].production_budgeting()
for i in range(len(self.csfirm_agents))
]
[
self.cpfirm_agents[i].production_budgeting()
for i in range(len(self.cpfirm_agents))
]
# Energy firm demand
self._energy_demand()
self.brownEFirm.calculate_input_demand()
self.greenEFirm.calculate_input_demand()
if self.t > 1:
self.brownEFirm.production_budgeting()
self.greenEFirm.production_budgeting()
# Consumer Demand
if self.t > 31:
for i in self.workingAgeConsumers:
self.aliveConsumers[i].desired_C()
[docs]
def stepwise_produce(self):
"""
This internal function of the model is used to propagate the production of the firm
agents
"""
# Energy market opens
self.brownEFirm.produce()
self.greenEFirm.produce()
self.brownEFirm.price_setting()
self.greenEFirm.price_setting()
# Labour market opens
self._hire()
# Check covid start date
if self.t > self.covidStartDate and self.num_infection != 0:
self._propagate_covid()
# [firm.hire() for firm in np.random.permutation(self.firms)]
# We probably need a function to make sure unemployed ppl receive benefit here
# Goods and Capital firm opens
[self.csfirm_agents[i].produce() for i in range(len(self.csfirm_agents))]
[self.cpfirm_agents[i].produce() for i in range(len(self.cpfirm_agents))]
# Scenario S3MOD: temporary lumpsum that raises green CS output
if (
(self.t >= self.fiscalDate)
and self.p.covid_settings
and self.p.settings == "S3MOD"
and self.fiscal_count < 3
):
print(
len(
self.csfirm_agents.select(
self.csfirm_agents.getUseEnergy() == "green"
)
)
)
for i in range(
len(
self.csfirm_agents.select(
self.csfirm_agents.getUseEnergy() == "green"
)
)
):
print("lumpsum_update")
print(self.csfirm_agents[i].get_actual_production())
self.csfirm_agents[i].update_actual_production(
self.p.lumpSum / self.csfirm_agents[i].getPrice()
)
print(self.csfirm_agents[i].get_actual_production())
[docs]
def stepwise_after_production(self, eps=1e-8):
"""
This internal function of the model is used to do jobs after production
"""
# Firms transaction and accounting
self.bank_agents.sommaW()
[self.csfirm_agents[i].price_setting() for i in range(len(self.csfirm_agents))]
[self.cpfirm_agents[i].price_setting() for i in range(len(self.cpfirm_agents))]
self._csf_transaction()
self._cpf_transaction()
# CS firms: profits, capital updates, NPL accounting
for i in range(len(self.csfirm_agents)):
self.csfirm_agents[i].compute_net_profit()
self.csfirm_agents[i].update_capital_growth()
self.bank_agents.NPL += self.csfirm_agents[i].non_loan
self.bank_agents.profit -= (1 + self.bankIL) * self.csfirm_agents[
i
].non_loan
self.csfirm_agents[i].reset_non_loan()
# CP firms: profits, capital updates, NPL accounting
for i in range(len(self.cpfirm_agents)):
self.cpfirm_agents[i].compute_net_profit()
self.cpfirm_agents[i].update_capital_growth()
self.bank_agents.NPL += self.cpfirm_agents[i].non_loan
self.cpfirm_agents[i].reset_non_loan()
self.bank_agents.profit -= (1 + self.bankIL) * self.csfirm_agents[i].non_loan
if self.p.verboseFlag:
print("____bank profit", self.bank_agents.profit)
print("___bank DTE", self.bank_agents.DTE)
# print("capital firm growth", self.cpfirm_agents[i].capital, self.cpfirm_agents[i].capital_growth)
print("total non loan", self.bank_agents.NPL)
# Energy firms: profits and capital growth
self.brownEFirm.compute_net_profit()
self.greenEFirm.compute_net_profit()
self.brownEFirm.update_capital_growth()
self.greenEFirm.update_capital_growth()
self.totalFirms.update_capital_value()
## Accounting owner's income
self.capitalistsIncome = (
(
sum([i for i in self.cpfirm_agents.getOwnerIncome() if i >= 0])
+ sum([i for i in self.csfirm_agents.getOwnerIncome() if i >= 0])
)
/ self.p.capitalists
* (1 - self.p.incomeTaxRate)
)
# print("capitalist income", self.capitalistsIncome)
self.totalTaxes += (
(
sum([i for i in self.cpfirm_agents.getOwnerIncome() if i >= 0])
+ sum([i for i in self.csfirm_agents.getOwnerIncome() if i >= 0])
)
/ self.p.capitalists
* self.p.incomeTaxRate
)
self.greenEnergyOwnersIncome = (
(sum([i for i in self.greenEFirm.getOwnerIncome() if i >= 0]))
/ self.p.green_energy_owners
* (1 - self.p.incomeTaxRate)
)
self.totalTaxes += (
(sum([i for i in self.greenEFirm.getOwnerIncome() if i >= 0]))
/ self.p.green_energy_owners
* self.p.incomeTaxRate
)
# print("green income", self.greenEnergyOwnersIncome)
self.brownEnergyOwnersIncome = (
(sum([i for i in self.brownEFirm.getOwnerIncome() if i >= 0]))
/ self.p.brown_energy_owners
* (1 - self.p.incomeTaxRate)
)
# print("brown income", self.brownEnergyOwnersIncome)
self.totalTaxes += (
(sum([i for i in self.brownEFirm.getOwnerIncome() if i >= 0]))
/ self.p.brown_energy_owners
* self.p.incomeTaxRate
)
## Climate progression
if self.p.climateModuleFlag:
self.climateModule.progress(self.totalFirms)
self._induce_climate_shock()
if self.t == 31:
self.climateModule.initAggregatedIncome()
## Accounting taxes and corresponding policies
self._carbon_tax_policy()
self.government_agents.update_budget(self.totalTaxes)
## Accounting consumers
for i in self.workingAgeConsumers:
if self.aliveConsumers[i].getConsumerType() == "capitalists":
self.aliveConsumers[i].setDiv(self.capitalistsIncome)
elif (
self.p.energySectorFlag
and self.aliveConsumers[i].getConsumerType() == "green_energy_owners"
):
self.aliveConsumers[i].setDiv(self.greenEnergyOwnersIncome)
elif (
self.p.energySectorFlag
and self.aliveConsumers[i].getConsumerType() == "brown_energy_owners"
):
self.aliveConsumers[i].setDiv(self.brownEnergyOwnersIncome)
self.aliveConsumers.update_wealth() # this might be wrong, pay attention
[docs]
def stepwise_termination(self):
"""
This internal function of the model is used to clean up and summarize stepwise variables
"""
self.bankrupt_count = 0
## Check firms insolvency and update firms after bankruptcy, and bank injection
if self.t > 365:
self.csfirm_agents.setBankruptcy()
self.cpfirm_agents.setBankruptcy()
if len(self.bankrupt_list) > 11:
self.bankrupt_list.pop(0)
self.bankrupt_list.append(self.bankrupt_count)
self.bankrupt_total_count = np.sum(self.bankrupt_list)
# print(self.bankrupt_total_count)
## Bank and Government accounting
self.expenditure = np.sum(self.government_agents.E_Gov())
self.ue_gov = np.sum(self.government_agents.UE_Gov())
# Rebuild national accounts
self.GDP = 0
self.cssale = 0
self.ksale = 0
for firm in self.totalFirms:
self.GDP += np.sum([firm.getSoldProducts() * firm.getPrice()])
for firm in self.csfirm_agents:
self.cssale += np.sum([firm.getSoldProducts() * firm.getPrice()])
for firm in self.cpfirm_agents:
self.ksale += np.sum([firm.getSoldProducts() * firm.getPrice()])
self.GDP += np.sum([self.expenditure])
# Update inequality metrics
income_combined = (
self.aliveConsumers.getWage() + self.aliveConsumers.getIncome()
)
self.gini = gini(income_combined)
self.consumption_gini = gini(self.aliveConsumers.getConsumption())
# Reset lockdown flags
self.csfirm_agents.resetLockDown()
self.cpfirm_agents.resetLockDown()
[docs]
def update(self, eps=1e-8):
"""Record metrics for analysis"""
super().update()
if int(str(self.tomorrow).split("-")[-1]) == 1:
# Monthly recording of all major indicators
# Record date as string for better compatibility
self.record("date", str(self.today))
self.record("GDP", float(self.GDP)) # Ensure float scalar
self.record("Gini", float(self.gini)) # Ensure float scalar
self.record("People", int(len(self.aliveConsumers)))
self.record("Gini Consumption", float(self.consumption_gini))
# For array-like data, convert to Python lists to avoid Polars/numpy interaction issues with sparse data
# ambr handles list of lists better than list of numpy arrays mixed with None
self.record(
"UnemplDole",
listToArray(self.aliveConsumers.getWage())[
self.aliveConsumers.getWage() == self.p.unemploymentDole
].tolist(),
)
self.record("Unemployment Expenditure", float(self.ue_gov))
self.record(
"Owners Income", listToArray(self.aliveConsumers.getDiv()).tolist()
)
self.record("Wage", listToArray(self.aliveConsumers.getWage()).tolist())
# self.record('Average Income', listToArray( np.mean(self.aliveConsumers.getIncome())))
self.record(
"Employed", listToArray(self.aliveConsumers.isEmployed()).tolist()
)
self.record(
"Consumer Type",
listToArray(self.aliveConsumers.getConsumerType()).tolist(),
)
self.record(
"UnemploymentRate",
float(
np.sum(
listToArray(self.aliveConsumers.getUnemploymentState()), axis=0
)
/ (self.p.c_agents - self.num_owner)
),
)
self.record(
"Consumption",
listToArray(self.aliveConsumers.getConsumption()).tolist(),
)
self.record(
"Desired Consumption",
listToArray(self.aliveConsumers.get_desired_consumption()).tolist(),
)
# Bank metrics
self.record("Loans", listToArray(self.bank_agents.loans).tolist())
self.record(
"Bank totalLoanSupply",
listToArray(self.bank_agents.totalLoanSupply).tolist(),
)
self.record("Bank Equity", listToArray(self.bank_agents.equity).tolist())
self.record(
"Bank Deposits", listToArray(self.bank_agents.deposits).tolist()
)
self.record(
"Bank LDR",
listToArray(
self.bank_agents.loans / (self.bank_agents.deposits + eps)
).tolist(),
)
self.record(
"Bank Loan Demands",
listToArray(self.bank_agents.totalLoanDemands).tolist(),
)
self.record(
"Bank Loan Over Equity",
listToArray(
self.bank_agents.actualSuppliedLoan
/ (self.bank_agents.equity + eps)
).tolist(),
)
self.record("Bank DTE", listToArray(self.bank_agents.DTE).tolist())
self.record(
"Non Performing Loan", listToArray(self.bank_agents.NPL).tolist()
)
# self.record('Expected Inflation Rate', listToArray(self.expectedInflationRateList))
self.record("Inflation Rate", listToArray(self.inflationRateList).tolist())
self.record(
"Total Loan Demand",
listToArray(self.bank_agents.totalLoanDemands).tolist(),
)
# CS Firm metrics
self.record("CS Num Bankrupt", int(self.numCSFirmBankrupt))
self.record(
"CS V Cost",
listToArray(self.csfirm_agents.get_average_production_cost()).tolist(),
)
self.record(
"CS U Cost",
listToArray(self.csfirm_agents.get_average_production_cost()).tolist(),
)
self.record(
"CS Firm Loans", listToArray(self.csfirm_agents.loanObtained).tolist()
)
self.record(
"CS Net Profits", listToArray(self.csfirm_agents.net_profit).tolist()
)
self.record(
"CS Capital", listToArray(self.csfirm_agents.get_capital()).tolist()
)
self.record(
"CS Net Worth", listToArray(self.csfirm_agents.getNetWorth()).tolist()
)
self.record(
"CS Number of Workers",
listToArray(self.csfirm_agents.countWorkers).tolist(),
)
self.record(
"CS Number of Consumers",
listToArray(self.csfirm_agents.countConsumers).tolist(),
)
self.record("CS Price", listToArray(self.csfirm_agents.getPrice()).tolist())
self.record(
"CS Sold Products",
listToArray(self.csfirm_agents.getSoldProducts()).tolist(),
)
self.record("CS Sale", listToArray(self.cssale).tolist())
self.record("CS iL", listToArray(self.csfirm_agents.iL).tolist())
self.record("CS iF", listToArray(self.csfirm_agents.iF).tolist())
self.record(
"CS Loan Obtained",
listToArray(self.csfirm_agents.loanObtained).tolist(),
)
self.record(
"CS Deposit", listToArray(self.csfirm_agents.getDeposit()).tolist()
)
self.record(
"CS Margin", listToArray(self.csfirm_agents.profit_margin).tolist()
)
self.record(
"CS Capital Investment",
listToArray(self.csfirm_agents.get_capital_investment()).tolist(),
)
self.record(
"CS Production Cost",
listToArray(
self.csfirm_agents.get_average_production_cost()
* self.csfirm_agents.get_actual_production()
).tolist(),
)
self.record(
"CS Capacity",
listToArray(self.csfirm_agents.get_actual_production()).tolist(),
)
self.record(
"CS Wage Bill", listToArray(self.csfirm_agents.wage_bill).tolist()
)
self.record(
"CS Loan Payment", listToArray(self.csfirm_agents.payback).tolist()
)
self.record(
"CS Credit Default Risk",
listToArray(self.csfirm_agents.defaultProb).tolist(),
)
# CP Firm metrics
self.record(
"CP Credit Default Risk",
listToArray(self.cpfirm_agents.defaultProb).tolist(),
)
self.record("CP Num Bankrupt", int(self.numCPFirmBankrupt))
self.record(
"CP Firm Loans", listToArray(self.cpfirm_agents.loanObtained).tolist()
)
self.record(
"CP Net Profits", listToArray(self.cpfirm_agents.net_profit).tolist()
)
self.record(
"CP Net Worth", listToArray(self.cpfirm_agents.getNetWorth()).tolist()
)
self.record(
"CP Capital", listToArray(self.cpfirm_agents.get_capital()).tolist()
)
self.record("CP Price", listToArray(self.cpfirm_agents.getPrice()).tolist())
self.record(
"CP Sold Products",
listToArray(self.cpfirm_agents.getSoldProducts()).tolist(),
)
self.record("CP Sale", listToArray(self.ksale).tolist())
self.record(
"CP Number of Workers",
listToArray(self.cpfirm_agents.countWorkers).tolist(),
)
self.record(
"CP Number of Consumers",
listToArray(self.cpfirm_agents.countConsumers).tolist(),
)
self.record(
"CP V Cost",
listToArray(self.cpfirm_agents.get_average_production_cost()).tolist(),
)
self.record(
"CP U Cost",
listToArray(self.cpfirm_agents.get_average_production_cost()).tolist(),
)
self.record("CP iL", listToArray(self.cpfirm_agents.iL).tolist())
self.record("CP iF", listToArray(self.cpfirm_agents.iF).tolist())
self.record(
"CP Loan Obtained",
listToArray(self.cpfirm_agents.loanObtained).tolist(),
)
self.record(
"CP Deposit", listToArray(self.cpfirm_agents.getDeposit()).tolist()
)
self.record(
"CP Production Cost",
listToArray(
self.cpfirm_agents.get_average_production_cost()
* self.cpfirm_agents.get_actual_production()
).tolist(),
)
self.record(
"CP Capacity",
listToArray(self.cpfirm_agents.get_actual_production()).tolist(),
)
self.record(
"CP Wage Bill", listToArray(self.cpfirm_agents.wage_bill).tolist()
)
self.record(
"CP Loan Payment", listToArray(self.cpfirm_agents.payback).tolist()
)
# Governments
self.record(
"Fiscal Policy", listToArray(self.government_agents.fiscal).tolist()
)
self.record("Expenditures", listToArray(self.expenditure).tolist())
self.record("Total Taxes", listToArray(self.totalTaxes).tolist())
self.record("Budget", listToArray(self.government_agents.budget).tolist())
# Covid
self.record("Deaths", listToArray(self.covid_death).tolist())
# Investments
greenCapitalMeanPrice = np.mean(
list(
self.cpfirm_agents.select(
self.cpfirm_agents.getUseEnergy() == "green"
).getPrice()
)
)
brownCapitalMeanPrice = np.mean(
list(
self.cpfirm_agents.select(
self.cpfirm_agents.getUseEnergy() == "brown"
).getPrice()
)
)
greenInvestment = greenCapitalMeanPrice * (
np.sum(
[
self.csfirm_agents.select(
self.csfirm_agents.getUseEnergy() == "green"
).get_capital_investment()
]
)
+ np.sum(
[
self.cpfirm_agents.select(
self.cpfirm_agents.getUseEnergy() == "green"
).get_capital_investment()
]
)
+ np.sum([self.greenEFirm.get_capital_investment()])
)
brownInvestment = brownCapitalMeanPrice * (
np.sum(
[
self.csfirm_agents.select(
self.csfirm_agents.getUseEnergy() == "brown"
).get_capital_investment()
]
)
+ np.sum(
[
self.cpfirm_agents.select(
self.cpfirm_agents.getUseEnergy() == "brown"
).get_capital_investment()
]
)
+ np.sum([self.brownEFirm.get_capital_investment()])
)
self.record("Green Investments", greenInvestment)
self.record("Brown Investments", brownInvestment)
self.record("Investment", greenInvestment + brownInvestment)
# Energy firms
self.record("GE Net Profits", listToArray(self.greenEFirm.net_profit))
self.record("GE Price", listToArray(self.greenEFirm.getPrice()))
self.record(
"GE Capital Demand", listToArray(self.greenEFirm.get_capital_demand())
)
self.record("GE Deposit", listToArray(self.greenEFirm.getDeposit()))
self.record("BE Net Profits", listToArray(self.brownEFirm.net_profit))
self.record("BE Price", listToArray(self.brownEFirm.getPrice()))
self.record(
"BE Capital Demand", listToArray(self.brownEFirm.get_capital_demand())
)
self.record("BE Deposit", listToArray(self.brownEFirm.getDeposit()))
# Climate module
if self.p.climateModuleFlag:
self.record("Climate C02 Taxes", listToArray(self.totalCarbonTaxes))
self.record("Climate C02", listToArray(self.climateModule.CO2))
self.record("Climate EM", listToArray(self.climateModule.EM))
self.record(
"Climate EM Stepwise", listToArray(self.climateModule.step_EM)
)
self.record(
"Climate C02 Concentration", listToArray(self.climateModule.conc_t)
)
self.record(
"Climate Radiative Forcing", listToArray(self.climateModule.RF)
)
self.record("Climate Temperature", listToArray(self.climateModule.T))
# Data writers
try:
if (
len(self.bank_agents.bankDataWriter) > 0
and len(self.bank_agents.bankDataWriter[0]) > 0
):
self.record(
"BankDataWriter",
listToArray(self.bank_agents.bankDataWriter)[-1][-1],
)
if (
len(self.csfirm_agents.firmDataWriter) > 0
and len(self.csfirm_agents.firmDataWriter[0]) > 0
):
self.record(
"CSFirmDataWriter",
listToArray(self.csfirm_agents.firmDataWriter)[-1][-1],
)
if (
len(self.cpfirm_agents.firmDataWriter) > 0
and len(self.cpfirm_agents.firmDataWriter[0]) > 0
):
self.record(
"CPFirmDataWriter",
listToArray(self.cpfirm_agents.firmDataWriter)[-1][-1],
)
except:
self.record(
"BankDataWriter", listToArray(self.bank_agents.bankDataWriter)
)
self.record(
"CSFirmDataWriter", listToArray(self.csfirm_agents.firmDataWriter)
)
self.record(
"CPFirmDataWriter", listToArray(self.cpfirm_agents.firmDataWriter)
)
elif self.p.covid_settings is not None and self.t > self.covidStartDate:
# Daily COVID recording
self.record(
"Covid State",
listToArray(self.consumer_agents.getCovidStateAttr("state")),
)
self.record("Infection", listToArray(self.num_infection))
self.record("Exposed", listToArray(self.num_exposed))
self.record("Susceptible", listToArray(self.num_susceptible))
self.record("Recover", listToArray(self.num_recover))
self.record("Dead", listToArray(self.num_death))
self.record("mild", listToArray(self.num_mild))
self.record("severe", listToArray(self.num_severe))
self.record("critical", listToArray(self.num_critical))
[docs]
def end(self):
"""Record evaluation measures at the end of the simulation."""
# ========================================
# Market Helper Routines
# ========================================
def _csf_forecast_demand(self):
"""Aggregate household desired consumption and distribute to CS firms"""
# Vectorized aggregation: Select consumers by working age index list
# self.workingAgeConsumers contains indices of working age consumers
working_consumers = self.aliveConsumers[self.workingAgeConsumers]
aggregated_demand = np.sum(working_consumers.get_desired_consumption())
# Vectorized firm update: set demand proportional to market share
self.csfirm_agents.call("prepareForecast")
market_shares = self.csfirm_agents.market_share
if isinstance(market_shares, list):
market_shares = np.array(market_shares)
demands = aggregated_demand * market_shares
# Iterate to set since we don't have a direct vector setter for this specific calculation pattern yet
# (Could use batch_update if we constructed a dict, but this loop is simple enough for now
# given we need to multiply scalar * vector)
for i, firm in enumerate(self.csfirm_agents):
firm.set_aggregate_demand(demands[i])
def _csf_transaction(self):
"""Consumer-goods market clearing: households buy from firms sorted by price"""
self.total_good = 0
ordered_price = OrderedDict()
self.countConsumersPerCompanyC = {}
# Build price list and reset sale records
for i in range(len(self.csfirm_agents)):
# set counter for # consumers per company
self.countConsumersPerCompanyC[i] = 0
ordered_price[i] = self.csfirm_agents[i].getPrice()
self.csfirm_agents[i].set_sale_record(0)
# lambda is mapping item to item[1]; the function indicates that we're sorting based on the price, not the name of the companies
ordered_price = OrderedDict(
sorted(ordered_price.items(), key=lambda item: item[1])
)
# Prepare ordered production by firm
self.orderedCompaniesProductionC = OrderedDict()
total_production = 0
for (
company,
price,
) in (
ordered_price.items()
): # this will be wrong since we will reintroduce price later
# print("sale price", price)
if not self.csfirm_agents[company].lockdown:
self.orderedCompaniesProductionC[company] = self.csfirm_agents[
company
].get_actual_production()
total_production += self.orderedCompaniesProductionC[company]
self.orderedCompaniesProductionC[
company
] *= self.demand_fluctuation ** (self.demand_fluctuation < 1)
# Households purchase from cheapest firms first
for i in np.random.permutation(self.workingAgeConsumers):
aConsumer = self.aliveConsumers[i]
aConsumer.setConsumption(0)
purchase = 0
desired_consumption = (
aConsumer.get_desired_consumption() * self.demand_fluctuation
)
# Pre-calculate offset for firm ID mapping
# The 'company' key in orderedCompaniesProductionC comes from loop index 'i' (0 to n_firms-1)
# The code below previously searched for identity == company + offset
# But wait, self.csfirm_agents is a list/sequence.
# If 'company' is just the index 'i' from the first loop (line 1014),
# then we can just access self.csfirm_agents[company] directly!
# The original code used:
# chosenFirm = self.csfirm_agents.select(getIdentity() - offset == company)
# If company 'i' corresponds to csfirm_agents[i], then getIdentity() should match.
# So 'company' IS the index.
# Optimization: Direct access
for company, production in self.orderedCompaniesProductionC.items():
if purchase >= desired_consumption:
break
chosenFirm = self.csfirm_agents[company]
# Double check identity if paranoid, but structure implies index alignment
# (The original code searched for identity - offset == company)
if production == 0:
continue
else:
price = ordered_price[company]
aConsumer.price = price
self.countConsumersPerCompanyC[company] += 1
# Budget-constrained purchase
if (production - desired_consumption) >= 0:
if price > 0:
purchase = np.max(
[
np.min(
[
desired_consumption,
(aConsumer.deposit + aConsumer.getIncome())
/ price,
]
),
0,
]
)
else:
purchase = desired_consumption
# if aConsumer.owner:
# print("purchase amount", purchase, (aConsumer.deposit + aConsumer.getIncome()) / price, desired_consumption, aConsumer.deposit, aConsumer.getIncome(), price, aConsumer.consumerType)
self.orderedCompaniesProductionC[company] = (
production - purchase
)
chosenFirm.updateSoldProducts(np.sum(purchase))
production -= np.sum(purchase)
actual_consumption = purchase
self.total_good += purchase
else:
# Partial fulfillment
self.orderedCompaniesProductionC[company] = 0
actual_consumption = production
chosenFirm.updateSoldProducts(np.sum(production))
self.total_good += production
production = 0
aConsumer.setConsumption(actual_consumption * price)
# print("consumer demand", aConsumer.desired_consumption, actual_consumption)
chosenFirm.update_sale_record(actual_consumption)
break
# print("total product sale", chosenFirm.getSoldProducts())
if self.p.verboseFlag:
print("total sale", self.total_good, total_production)
def _cpf_forecast_demand(self):
"""Build brown/green capital demand from CS+Energy firms"""
self.cpfirm_agents.call("prepareForecast")
firmsList = self.csfirm_agents + self.brownEFirm + self.greenEFirm
# Vectorized aggregation
# Assuming `useEnergy` is an attribute 'brown' or 'green'
use_energy = firmsList.useEnergy
capital_demands = firmsList.get_capital_demand() # Vectorized call
# Check if returned as list or numpy array, convert if needed
if isinstance(use_energy, list):
use_energy = np.array(use_energy)
if isinstance(capital_demands, list):
capital_demands = np.array(capital_demands)
b_mask = use_energy == "brown"
b_aggregated_demand = np.sum(capital_demands[b_mask])
g_aggregated_demand = np.sum(capital_demands[~b_mask])
# Vectorized assignment
# CP firms also have `useEnergy`
cp_use_energy = self.cpfirm_agents.useEnergy
if isinstance(cp_use_energy, list):
cp_use_energy = np.array(cp_use_energy)
cp_b_mask = cp_use_energy == "brown"
# Set demands using loops for now to be safe until bulk setter is confirmed safe
# self.cpfirm_agents.select(cp_b_mask).call("set_aggregate_demand", b_aggregated_demand)
# self.cpfirm_agents.select(~cp_b_mask).call("set_aggregate_demand", g_aggregated_demand)
for firm in self.cpfirm_agents:
if firm.useEnergy == "brown":
firm.set_aggregate_demand(b_aggregated_demand)
else:
firm.set_aggregate_demand(g_aggregated_demand)
def _cpf_transaction(self):
"""Capital-goods market clearing: CP sells to CS+Energy firms"""
self.firmsList = self.csfirm_agents + self.brownEFirm + self.greenEFirm
for i in range(len(self.cpfirm_agents)):
chosenFirm = self.cpfirm_agents[i]
price = chosenFirm.getPrice()
chosenFirm.set_sale_record(0)
# print("capital price", price)
K_production = chosenFirm.get_actual_production()
for j in np.random.permutation(len(self.firmsList)):
aFirm = self.firmsList[j]
# Tech-matched transactions only
if aFirm.useEnergy == self.cpfirm_agents[i].useEnergy:
aFirm = self.firmsList[j]
K_consumption = aFirm.get_capital_demand()
# print("K firm demand", K_consumption, K_production)
aFirm.set_capital_investment(0)
if K_consumption == 0:
break
if K_production == 0:
break
# Fulfill demand up to available production
if (K_production - K_consumption) >= 0:
chosenFirm.updateSoldProducts(np.sum(K_consumption))
K_production = K_production - K_consumption
else:
K_consumption = K_production
chosenFirm.updateSoldProducts(np.sum(K_production))
K_production = 0
aFirm.capital_purchase = K_consumption
aFirm.set_capital_investment(np.sum(K_consumption * price))
aFirm.set_capital_price(price)
chosenFirm.update_sale_record(K_consumption)
# print("machine sale", aFirm.capital_purchase)
def _energy_demand(self):
"""Aggregate energy demand from CS+CP firms by technology"""
b_aggregated_demand = 0
g_aggregated_demand = 0
self.firmsList = self.csfirm_agents + self.cpfirm_agents
for i in np.random.permutation(len(self.firmsList)):
aFirm = self.firmsList[i]
if aFirm.useEnergy == "brown":
b_aggregated_demand += aFirm.get_energy()
else:
g_aggregated_demand += aFirm.get_energy()
self.brownEFirm.set_energy_demand(b_aggregated_demand)
self.greenEFirm.set_energy_demand(g_aggregated_demand)
# ========================================
# COVID Helper Routines
# ========================================
def _make_random_contacts(self):
"""Generate random daily contacts in the community"""
eps = 1e-8
infection_rate = self.model.num_infection / (
len(self.model.aliveConsumers) + eps
)
num_contacts_community = self.p.num_contacts_community
dist = (infection_rate > self.p.inf_threshold) * (
self.p.covid_settings == "DIST"
)
lock = self.lockdown
if dist:
num_contacts_community = self.p.num_contacts_community / 2
if lock:
num_contacts_community = self.p.num_contacts_community / 4
# Preprocessing
contact_list = dict()
pop_size = int(len(self.aliveConsumers)) # Number of people
if pop_size > 0:
p1 = [] # Initialize the "sources"
p2 = [] # Initialize the "targets"
# Precalculate contacts
n_all_contacts = int(
pop_size * num_contacts_community * self.p.overshoot_community
) # The overshoot is used so we won't run out of contacts if the Poisson draws happen to be higher than the expected value
all_contacts = np.random.choice(
pop_size, n_all_contacts, replace=True
) # Choose people at random
if self.p.dispersion_community is None:
p_count = np.random.poisson(
num_contacts_community, pop_size
) # Draw the number of Poisson contacts for this person
else:
p_count = (
np.random.negative_binomial(
n=self.p.dispersion_community,
p=self.p.dispersion_community
/ (num_contacts_community / 1 + self.p.dispersion_community),
size=pop_size,
)
* 1
) # Or, from a negative binomial
p_count = np.array((p_count / 2.0).round(), dtype=np.int32)
# Make contacts
count = 0
for p in range(pop_size):
n_contacts = p_count[p]
these_contacts = all_contacts[
count : count + n_contacts
] # Assign people
count += n_contacts
p1.extend([p] * n_contacts)
p2.extend(these_contacts)
contact_list.update(_merge_edgelist(p1, p2, None))
return contact_list
def _make_random_contacts_in_firms(self):
"""Generate random daily contacts inside each firm"""
eps = 1e-8
infection_rate = self.model.num_infection / (
len(self.model.aliveConsumers) + eps
)
num_contacts_firms = self.p.num_contacts_firms
dist = (infection_rate > self.p.inf_threshold) * (
self.p.covid_settings == "DIST"
)
if dist:
num_contacts_firms = self.p.num_contacts_firms / 2
contact_list = dict()
for firm in self.firms:
# Preprocessing
pop_size = int(len(firm.workersList)) # Number of people
if not firm.lockdown and pop_size > 0:
p1 = [] # Initialize the "sources"
p2 = [] # Initialize the "targets"
# Precalculate contacts
n_all_contacts = int(
pop_size * num_contacts_firms * self.p.overshoot_firms
) # The overshoot is used so we won't run out of contacts if the Poisson draws happen to be higher than the expected value
all_contacts = np.random.choice(
pop_size, n_all_contacts, replace=True
) # Choose people at random
if self.p.dispersion_firms is None:
p_count = np.random.poisson(
num_contacts_firms, pop_size
) # Draw the number of Poisson contacts for this person
else:
p_count = (
np.random.negative_binomial(
n=self.p.dispersion_firms,
p=self.p.dispersion_firms
/ (num_contacts_firms / 1 + self.p.dispersion_firms),
size=pop_size,
)
* 1
) # Or, from a negative binomial
p_count = np.array((p_count / 2.0).round(), dtype=np.int32)
# Make contacts
count = 0
for p in range(pop_size):
n_contacts = p_count[p]
these_contacts = all_contacts[
count : count + n_contacts
] # Assign people
count += n_contacts
p1.extend([p] * n_contacts)
p2.extend(these_contacts)
if len(contact_list) > 0:
contact_list["p1"] = np.concatenate(
[
contact_list["p1"],
(_merge_edgelist(p1, p2, firm.workersList, 1)["p1"]),
]
)
contact_list["p2"] = np.concatenate(
[
contact_list["p2"],
(_merge_edgelist(p1, p2, firm.workersList, 1)["p2"]),
]
)
else:
contact_list.update(_merge_edgelist(p1, p2, firm.workersList, 1))
else:
continue
return contact_list
def _propagate_contacts(self, contact_list_f, contact_list_c, eps=1e-8):
"""Spread infections through firm and community contacts"""
contacts_firm = np.argwhere(contact_list_f["p1"] == self.id)
contacts_community = np.argwhere(contact_list_c["p1"] == self.id)
infected_contact_firm = self.model.aliveConsumers.select(
[
self.model.aliveConsumers.getIdentity()
== contact_list_f["p2"][np.sum(el)]
and self.model.aliveConsumers.getCovidStateAttr("state")
in [
"exposed",
"infected non-sympotomatic",
"mild",
"severe",
"critical",
]
for el in contacts_firm
]
)
infected_contact_community = self.model.aliveConsumers.select(
[
self.model.aliveConsumers.getIdentity()
== contact_list_c["p2"][np.sum(el)]
and self.model.aliveConsumers.getCovidStateAttr("state")
in [
"exposed",
"infected non-sympotomatic",
"mild",
"severe",
"critical",
]
for el in contacts_community
]
)
inf_f = len(infected_contact_firm)
inf_c = len(infected_contact_community)
infection_rate = self.model.num_infection / (
len(self.model.aliveConsumers) + eps
)
p_firm = self.p.p_contact_firms * self.p.p_sd ** (
(infection_rate > self.p.inf_threshold) * (self.p.covid_settings == "DIST")
)
p_community = self.p.p_contact_community * self.p.p_sd ** (
(infection_rate > self.p.inf_threshold) * (self.p.covid_settings == "DIST")
)
# print("infection rate", infection_rate, "firm and community", p_firm, p_community)
self.aliveConsumers.select(
self.aliveConsumers.getCovidStateAttr("state") == "susceptible"
).propagateContact(inf_f, inf_c, p_firm, p_community)
def _init_covid_exposure(self):
"""Initialize COVID states for population"""
print("start covid")
count = 0
for i in range(len(self.aliveConsumers)):
if self.aliveConsumers[i].getCovidStateAttr("state") == None:
self.aliveConsumers[i].setCovidState("susceptible", self.t, None, None)
for i in np.random.permutation(range(len(self.aliveConsumers))):
count += 1
if count <= self.p.initialExposer:
self.aliveConsumers[i].setCovidState(
"mild",
self.t,
lognormal(self.p.T_mild_severe_mean, self.p.T_mild_severe_std),
"severe",
)
else:
break
def _propagate_covid(self, eps=1e-8):
"""Daily progression of COVID spread & lockdown policy enforcement"""
# print("propagate covid")
if self.p.covid_settings == "LOCK":
if (
self.num_infection / len(self.aliveConsumers) + eps
) > self.p.p_lockdown and not self.lockdown:
# Trigger lockdown
self.lockdownCount = 1
self.lockdown = True
count_C = 0
count_K = 0
for csfirm in np.random.permutation(self.csfirm_agents):
if (
count_C / len(self.csfirm_agents) + eps
) < self.p.num_C_firms_LD:
csfirm.setLockDown()
for cpfirm in np.random.permutation(self.cpfirm_agents):
if (
count_K / len(self.cpfirm_agents) + eps
) < self.p.num_K_firms_LD:
cpfirm.setLockDown()
else:
if self.lockdown:
# Maintain lockdown
self.lockdownCount += 1
if self.lockdownCount >= self.p.duration_LD:
self.lockdown = False
self.csfirm_agents.unsetLockDown()
self.cpfirm_agents.unsetLockDown()
else:
# Normal spread through contacts
contact_list1 = self._make_random_contacts()
contact_list2 = self._make_random_contacts_in_firms()
self._propagate_contacts(contact_list2, contact_list1)
else:
contact_list1 = self._make_random_contacts()
contact_list2 = self._make_random_contacts_in_firms()
self._propagate_contacts(contact_list2, contact_list1)
self.aliveConsumers.progressCovid()
# ========================================
# Policy Helper Routines
# ========================================
def _carbon_tax_policy(self, eps=1e-8):
"""Carbon tax collection and redistribution"""
self.totalCarbonTaxes += np.sum([firm.carbonTax for firm in self.totalFirms])
self.totalTaxes += np.sum(list(self.csfirm_agents.getTax()))
self.totalTaxes += np.sum(list(self.cpfirm_agents.getTax()))
self.totalTaxes += np.sum(list(self.brownEFirm.getTax()))
self.totalTaxes += np.sum(list(self.greenEFirm.getTax()))
if self.p.settings.find("CTRa") != -1:
# Lump-sum redistribution
sharedCO2Tax = self.totalCarbonTaxes / (self.p.c_agents)
self.capitalistsIncome += np.sum(sharedCO2Tax) * (
self.p.capitalists
- len(
self.aliveConsumers.select(
self.aliveConsumers.getConsumerType() == "capitalists"
)
)
)
self.greenEnergyOwnersIncome += np.sum(sharedCO2Tax) * (
self.p.green_energy_owners
- len(
self.aliveConsumers.select(
self.aliveConsumers.getConsumerType() == "green_energy_owners"
)
)
)
self.brownEnergyOwnersIncome += np.sum(sharedCO2Tax) * (
self.p.brown_energy_owners
- len(
self.aliveConsumers.select(
self.aliveConsumers.getConsumerType() == "brown_energy_owners"
)
)
)
elif self.p.settings.find("CTR") != -1:
redistributive_policy = (np.sum(self.totalCarbonTaxes) * self.p.co2_tax) / (
self.GDP + eps
)
if self.p.settings.find("CTRb") != -1:
# Proportional to income
self.capitalistsIncome += np.sum(
self.capitalistsIncome * redistributive_policy
) * (
self.p.capitalists
- len(
self.aliveConsumers.select(
self.aliveConsumers.getConsumerType() == "capitalists"
)
)
)
self.greenEnergyOwnersIncome += np.sum(
self.greenEnergyOwnersIncome * redistributive_policy
) * (
self.p.green_energy_owners
- len(
self.aliveConsumers.select(
self.aliveConsumers.getConsumerType()
== "green_energy_owners"
)
)
)
self.brownEnergyOwnersIncome += np.sum(
self.brownEnergyOwnersIncome * redistributive_policy
) * (
self.p.brown_energy_owners
- len(
self.aliveConsumers.select(
self.aliveConsumers.getConsumerType()
== "brown_energy_owners"
)
)
)
elif self.p.settings.find("CTRc") != -1:
# Flat transfer based on average
self.capitalistsIncome += np.sum(
redistributive_policy
* np.mean(
[self.aliveConsumers.getWage(), self.aliveConsumers.getIncome()]
)
) * (
self.p.capitalists
- len(
self.aliveConsumers.select(
self.aliveConsumers.getConsumerType() == "capitalists"
)
)
)
self.greenEnergyOwnersIncome += np.sum(
redistributive_policy
* np.mean(
[self.aliveConsumers.getWage(), self.aliveConsumers.getIncome()]
)
) * (
self.p.green_energy_owners
- len(
self.aliveConsumers.select(
self.aliveConsumers.getConsumerType()
== "green_energy_owners"
)
)
)
self.brownEnergyOwnersIncome += np.sum(
redistributive_policy
* np.mean(
[self.aliveConsumers.getWage(), self.aliveConsumers.getIncome()]
)
) * (
self.p.brown_energy_owners
- len(
self.aliveConsumers.select(
self.aliveConsumers.getConsumerType()
== "brown_energy_owners"
)
)
)
elif self.p.settings.find("CTRd") != -1:
# Progressive redistribution
self.capitalistsIncome += np.sum(
1 / (self.capitalistsIncome + eps) * redistributive_policy
) * (
self.p.capitalists
- len(
self.aliveConsumers.select(
self.aliveConsumers.getConsumerType() == "capitalists"
)
)
)
self.greenEnergyOwnersIncome += np.sum(
1 / (self.greenEnergyOwnersIncome + eps) * redistributive_policy
) * (
self.p.green_energy_owners
- len(
self.aliveConsumers.select(
self.aliveConsumers.getConsumerType()
== "green_energy_owners"
)
)
)
self.brownEnergyOwnersIncome += np.sum(
1 / (self.brownEnergyOwnersIncome + eps) * redistributive_policy
) * (
self.p.brown_energy_owners
- len(
self.aliveConsumers.select(
self.aliveConsumers.getConsumerType()
== "brown_energy_owners"
)
)
)
else:
self.totalTaxes += self.totalCarbonTaxes
def _hire(self):
"""Match unemployed workers to firms with vacancies"""
self.workingConsumers = self.aliveConsumers.select(
self.aliveConsumers.isWorker() == True
)
for worker in self.workingConsumers:
if worker.isEmployed() != True:
suitable_firm_found = False
for firm in np.random.permutation(self.firms):
labour_demand = firm.labour_demand
if firm.getNumberOfLabours() < labour_demand:
worker.receiveHiring(firm.id)
firm.workersList.append(worker.id)
firm.wages[worker.id] = worker.getWage()
suitable_firm_found = True
break
if not suitable_firm_found:
break
def _fiscal_policy(self):
"""Government fiscal support to households and firms"""
if self.scenario == "1":
# Household transfers
[
self.aliveConsumers.gov_transfer(
self.alpha_h
* np.mean(
self.aliveConsumers.select(
self.aliveConsumers.getConsumerType() == "workers"
).getIncome()
)
)
]
if self.scenario == "1":
# Firm transfers based on revenue threshold
for firm in self.firms:
revenue = firm.soldProducts * firm.getPrice()
transfer_threshold = self.p.transfer_threshold
if revenue < transfer_threshold * firm.fix_cost:
self.firms.gov_transfer(self.alpha_f * firm.fix_cost)
def _induce_climate_shock(self):
"""Apply climate shock mortality and wealth losses"""
# Climate shock application.
# Two modes:
# - "AggPop": use aggregate population mortality PM from climate module (same probability across agents)
# - "Idiosyncratic": use individual survival outcomes from climate module (heterogeneous)
if (
self.climateShockMode == "AggPop"
and self.climateModule.shockHappens[0] == True
):
print("start shock")
# Number of deaths implied by climate module's population mortality (PM)
deadIDs = np.random.permutation(self.aliveConsumers.getIdentity())[
: np.max([int(self.climateModule.getPM()[0]), 0])
]
# Fractional wealth loss applied uniformly to survivors (proxy for asset damages)
loss_percentage = np.max([int(self.climateModule.getPM()[0]), 0]) / len(
self.aliveConsumers
)
# Mark selected agents as dead
[
self.aliveConsumers.select(
self.aliveConsumers.getIdentity() == ID
).setDead()
for ID in deadIDs
]
# Rebuild alive population view and working-age subset
self.aliveConsumers = self.aliveConsumers.select(
self.aliveConsumers.isDead() != True
)
self.workingAgeConsumers = [
idx
for idx in range(len(self.aliveConsumers))
if self.aliveConsumers[idx].getAgeGroup() == "working"
]
# Apply proportional wealth loss to survivors
self.aliveConsumers.wealth_loss(loss_percentage)
aliveIDs = self.aliveConsumers.getIdentity()
print(
f"Climate shock happens: {np.max([int(self.climateModule.getPM()[0]), 0])} people died!"
)
# Remove deceased workers from firms' rosters
for firm in self.firms:
if len(firm.workersList) > 0:
for workerID in firm.workersList:
if workerID not in aliveIDs:
firm.workersList.remove(workerID)
# Reset shock flag so it does not re-trigger immediately
self.climateModule.shockHappens = False
elif (
self.climateShockMode == "Idiosyncratic"
and self.climateModule.shockHappens[0] == True
):
# Number of deaths implied by individual survival outcomes: alive_post_shock provided by climate module
deadIDs = np.random.permutation(self.aliveConsumers.getIdentity())[
: np.max(
[
int(
len(self.aliveConsumers)
- self.climateModule.getAliveConsumersPostShock()[0]
),
0,
]
)
]
[
self.aliveConsumers.select(
self.aliveConsumers.getIdentity() == ID
).setDead()
for ID in deadIDs
]
# Fractional wealth loss computed from realized death share (proxy for distributed damages)
loss_percentage = np.max(
[
int(
len(self.aliveConsumers)
- self.climateModule.getAliveConsumersPostShock()[0]
),
0,
]
) / len(self.aliveConsumers)
# Refresh alive/working-age views
self.aliveConsumers = self.aliveConsumers.select(
self.aliveConsumers.isDead() != True
)
self.workingAgeConsumers = [
idx
for idx in range(len(self.aliveConsumers))
if self.aliveConsumers[idx].getAgeGroup() == "working"
]
# Apply proportional wealth loss to survivors
self.aliveConsumers.wealth_loss(loss_percentage)
aliveIDs = self.aliveConsumers.getIdentity()
# Update firms' worker lists after mortality
for firm in self.firms:
if len(firm.workersList) > 0:
for workerID in firm.workersList:
if workerID not in aliveIDs:
firm.workersList.remove(workerID)
# Reset shock flag
self.climateModule.shockHappens = False