-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathenv.py
166 lines (135 loc) · 5.26 KB
/
env.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
from typing import Dict, Union, List, Tuple
import numpy as np
from .products import *
PROMOTION_PRDTS = "promotion_prdts"
CUSTOMER_NEED = "customer_need"
CUSTOMER_TEMPERATURE = "customer_temperature"
CUSTOMER_BUDGET = "customer_budget"
def sample_softmax(x : Union[List[float], np.ndarray], t : float = 1):
"""
Sample from a softmax distribution.
:param x: The input vector.
:param t: The temperature.
:return: The sampled value.
"""
x = np.array(x)
x = x - np.max(x)
x = np.exp(x / t)
x = x / np.sum(x)
return np.random.choice(range(len(x)), p = x)
class RetailEnv:
def __init__(
self,
prdt_config,
customer_temperature_limit: float = 3.0,
n_customer_limit : int = 100,
budget_limit: float = 100.0,
goal : str = "sale",
):
self.prdt_config = prdt_config
self.customer_temperature_limit = customer_temperature_limit
self.budget_limit = budget_limit
self.n_customer_limit = n_customer_limit
self.goal = goal
self.n_category = self.prdt_config[NUM_CATEGORIES]
self.n_product = self.prdt_config[NUM_PRODUCTS]
self.consumption_rate = [
cat[CONSUMPTION_RATE] for cat in self.prdt_config[CATEGORIES]
]
self.prdts = []
for cats in self.prdt_config[CATEGORIES]:
for prdt in cats[PRODUCTS]:
self.prdts.append((prdt[PRDT_ID], prdt[PRDT_PRICE], prdt[PRDT_COST]))
self.time_step: int = 0
def _random_customer(self):
cust = {
CUSTOMER_NEED: [random.random() + x for x in self.consumption_rate],
CUSTOMER_TEMPERATURE: random.random() * self.customer_temperature_limit,
CUSTOMER_BUDGET: random.random() * self.budget_limit,
}
return cust
def _customer_event(self, prdt_sale : Dict[str, float])-> List[str]:
customer = self._random_customer()
customer_budget = customer[CUSTOMER_BUDGET]
customer_temperature = customer[CUSTOMER_TEMPERATURE]
customer_need = np.array(customer[CUSTOMER_NEED])
bought_history = []
while customer_budget > 0:
# select category
score = customer_temperature * customer_need
# score softmax
category_idx = sample_softmax(score)
# select product within that cateogry
# lazy implementation. will update later
# TODO: update later
_category = self.prdt_config[CATEGORIES][category_idx]
at_list = [
max(
_prdt[PRDT_TREND] * self.time_step
+ prdt_sale[_prdt[PRDT_ID]] * _prdt[PRDT_PRICE_ELASTICITY],
(random.random() < _prdt[PRDT_LOYALTY]) * 100,
)
for _prdt in _category[PRODUCTS]
]
at_list = np.array(at_list)
# select product. update budget.
prdt_idx = sample_softmax(at_list)
cost = _category[PRODUCTS][prdt_idx][PRDT_PRICE]
volume = _category[PRODUCTS][prdt_idx][PRDT_VOLUME]
prdt_id = _category[PRODUCTS][prdt_idx][PRDT_ID]
customer_budget -= cost
# update customer need
customer_need[category_idx] -= volume
for idx, jdx in self.prdt_config[COMP_GRAPH]:
if idx == category_idx:
customer_need[jdx] -= 0.5 * volume
for idx, jdx in self.prdt_config[EXCH_GRAPH]:
if jdx == category_idx:
customer_need[idx] += 0.5 * volume
bought_history.append(prdt_id)
return bought_history
def step(self, action):
promotion_dict : Dict[str, float] = action[PROMOTION_PRDTS]
# WIP
total_customers = random.randint(1, self.n_customer_limit)
qty = {}
for _ in range(total_customers):
bought_history = self._customer_event(promotion_dict)
for prdt_id in bought_history:
if prdt_id in qty:
qty[prdt_id] += 1
else:
qty[prdt_id] = 1
profit = self.get_profit(qty, self.prdts)
sale = self.get_sale(qty, self.prdts)
if self.goal == "sale":
reward = sale
elif self.goal == "profit":
reward = profit
else:
raise ValueError("Invalid goal")
self.time_step += 1
return qty,
@staticmethod
def get_sale(qty : Dict[str, float], prdts : List[Tuple[str, float, float]]) -> float:
sale = 0
for prdt_id, qty in qty.items():
for prdt in prdts:
if prdt[0] == prdt_id:
sale += qty * prdt[1]
break
return sale
@staticmethod
def get_profit(qty : Dict[str, float], prdts : List[Tuple[str, float, float]]) -> float:
profit = 0
for prdt_id, qty in qty.items():
for prdt in prdts:
if prdt[0] == prdt_id:
profit += qty * (prdt[1] - prdt[2])
break
return profit
def render(self):
# graphic render? to be implemented
pass
def reset(self):
pass