Skip to content
Snippets Groups Projects
Commit 94f983bb authored by Liyao Zhu's avatar Liyao Zhu
Browse files

agent, graph full build

game rebuilding...
parent d36fb474
Branches
No related tags found
No related merge requests found
Pipeline #1156 failed
import game """
This file contains an agent class, where agents are using Q-learning to evolve
strategy to play in a collective-risk game. For multi-arm bandit, epsilon-
greedy is implemented. Agents don't recognise their opponent, nor have memory
of previous rounds of a game. Their actions are based solely on their own
Q-Table, where states are consisted of round numbers and available actions.
Author: Liyao Zhu liyao@student.unimelb.edu.au
Date: Apr. 2019
"""
import numpy as np
class Agent():
def __init__(self, R, M, initialwealth): class Agent:
'add variables to construct agent instance and define instance variables' def __init__(self, rounds, initialWealth, availableActions, alpha=0.1,
gamma=0.9, epsilon=0.1):
self.undefined = None self.R = rounds
self.initialWealth = initialWealth
self.wealth = initialWealth
self.availableActions = availableActions
# self.iteration = 0
"""initialise Q table to small random numbers"""
self.qTable = np.random.rand(self.R, len(self.availableActions)) * 0.01
"Q-Learning Parameters"
self.learnRate = alpha
self.discount = gamma
self.epsilon = epsilon
def updateReward(self, round, action, loss):
"""
:param round:
:param action:
:param loss:
:return:
"""
newWealth = self.wealth * (1-action) * (1-loss)
reward = newWealth - self.wealth
self.wealth = newWealth
def chooseAction(self): index = self.availableActions.index(action)
if round == self.R - 1:
""" at goal state, no future value"""
maxNextQ = 0
elif round < self.R - 1:
""" not at goal state"""
maxNextQ = max(self.qTable[round + 1])
else:
print("ERROR: Illegal round number")
exit(2)
self.qTable[round][index] += self.learnRate * (
reward + self.discount * maxNextQ - self.qTable[round][index])
# print("QTABLE:", self.qTable)
'implement your strategy here' # if round == self.R - 1:
# self.iteration += 1
# print("Player iteration +1 =", self.iteration)
return 0
def chooseAction(self, roundNumber):
"""Method: Q-learning"""
"""Epsilon Decrease"""
# if np.random.uniform(0, 1) <= 1 * self.epsilon ** self.iteration:
"""EPSILON GREEDY"""
if np.random.uniform(0, 1) <= self.epsilon:
return np.random.choice(self.availableActions)
else:
index = np.argmax(self.qTable[roundNumber])
return self.availableActions[index]
def getStrategy(self):
"""
Get the current strategy without randomness, for analytical use
:return: a dictionary of actions in all rounds
"""
strategy = {}
for r in range(self.R):
index = np.argmax(self.qTable[r])
strategy[r] = self.availableActions[index]
return strategy
def getWealth(self):
return self.wealth
def resetWealth(self):
self.wealth = self.initialWealth
\ No newline at end of file
import agent
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
from matplotlib import cm
from mpl_toolkits.mplot3d import Axes3D
import numpy as np import numpy as np
import math import copy
import utilis import utilis, agent, graph
N = 100
# K = 2
# P = 0.8
I = 1000
R = 1
class game(): Actions = [0, 0.2, 0.4, 0.6, 0.8] # sort in ascending order
def __init__(self):
datamap = utilis.read()
self.N = datamap['N'] # N-Player Game
self.M = datamap['M'] # Randomly choose M players to play the game (normally 2)
self.RF = datamap['RF'] # Parsed number of risk function chosen for the game
self.alpha = datamap['alpha'] # Loss fraction
self.R = datamap['R'] # Rounds of a game
self.threshold = 0 # Threshold class game:
def __init__(self, K = 2, P = 0.8 ):
# datamap = utilis.read()
# self.N = datamap['N'] # N-Player Game
# self.M = datamap['M'] # Randomly choose M players to play the game (normally 2)
# self.RF = datamap['RF'] # Parsed number of risk function chosen for the game
# self.alpha = datamap['alpha'] # Loss fraction
# self.R = datamap['R'] # Rounds of a game
def createPalyers(self): self.N = N
players = [] self.M = 2
IW = 100 # Initial Wealth self.RF = 0
# for i in range(self.N): self.alpha = 1
# players.append(agent.Agent(self.R, self.N, IW)) self.R = R
# self.threshold = 0.5 # Threshold
#(strategy, wealth, fitness) # self.actions = [0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1]
self.actions = Actions
self.iterations = I
"""
| 2-Player Game Graph Model:
|
| P: Probability of rewiring each original edge in the graph
|
| K: The number of edges(games) connected to each player. Has to be an even number.
| If 1 is desired, don't use graph model. Max k: n - 2 (for even n) | n - 1 (for odd n)
| * k can be odd as n - 1 (for even n). In cases k = n - 1 (for all n) -> a fully connected graph
"""
# self.graph_based = True
self.rewire_p = P
self.rewire_k = K
# assert (self.rewire_k < self.N)
self.graph = graph.Graph(self.N, self.rewire_k, self.rewire_p)
def updatePopulation(self):
pass
def lossfrac(self, alpha): "Create players"
self.players = []
IW = 100 # Initial Wealth - can be input, can be variable to distinguish population
self.totalWealth = self.M * IW # subject to change
for i in range(self.N):
self.players.append(agent.Agent(self.R,IW,self.actions))
"""the percentage of wealth that players are going to lose if collective-risk happens"""
for risk in range(0,1): "Check if N is divisible by M"
return risk if self.N % self.M != 0:
print("ERROR, N is not divisible by M, abort")
exit(1)
def riskfunc(self,RF,contribution,anything): def lossfrac(self):
"""
the percentage of wealth that players are going to lose if collective-risk happens
"""
"""the probability of collective-risk happening, given contribution""" return self.alpha
if RF == 1:
return # probably parse more parameters here def riskfunc(self,RF,contribution,totalwealth):
elif RF == 2: """
the probability of collective-risk happening, given contribution
"""
proportion = contribution/totalwealth
if RF == 0:
# probably parse more parameters here
return 1 - proportion
elif RF == 1:
if proportion >= self.threshold:
return 0
else:
return 1 return 1
elif RF == 2:
if proportion < self.threshold:
return 1 - proportion / self.threshold
else:
return 0 return 0
return "error"
def computeRisk(self, contrib_sum, haveRisk = True): ############
if haveRisk:
return self.riskfunc(self.RF, contrib_sum, self.totalWealth)
else:
return 0
def computePayoff(self):
pass
def selectPlayers(self): def selectPlayers(self):
""" """
Randomly select M players from population of size N. Randomly select M players from population of size N for each game.
:return: An array of permutation of players as arrays of M players
""" """
return np.random.choice(self.N, self.M) return np.random.permutation(self.N).reshape((self.N//self.M, self.M)) # A 2-dimensional array, stating index of agents
def play2(self):
# lastStrategyTable = np.zeros((self.N, self.R))
# sameStrategyRounds = 0
def play(self): results = np.zeros((self.iterations,self.R, len(self.actions)))
"""ITERATION"""
for iter in range(self.iterations):
actionTable = np.zeros((self.N, self.R))
strategyTable = np.zeros((self.R, self.N)) # DIFFERENT AXIS R-N
lossTable = np.zeros((self.N, self.R))
for playerIndex in range(self.N): # For each player
player = self.players[playerIndex]
player.resetWealth() # reset initial wealth
for r in range(self.R): # For each round
action = player.chooseAction(r)
actionTable[playerIndex][r] = action
strategyTable[r][playerIndex] = player.getStrategy()[r]
playersNo = self.graph.select()
for r in range(self.R):
for [i, j] in playersNo:
pool = 0
pool += self.players[i].getWealth() * actionTable[i][r] +\
self.players[j].getWealth() * actionTable[j][r]
risk = self.computeRisk(pool, self.totalWealth)
for p in [i, j]:
if np.random.uniform(0, 1) < risk:
lossTable[p, r] += self.lossfrac()/self.graph.getNodesNumber()[p]
for i in range(self.N):
self.players[i].updateReward(r, actionTable[i][r], lossTable[i][r])
"""Strategy Stats"""
# if np.array_equal(strategyTable, lastStrategyTable):
# sameStrategyRounds += 1
# else:
# sameStrategyRounds = 0
# lastStrategyTable = strategyTable
for r in range(self.R):
unique, count = np.unique(strategyTable[r], return_counts=True)
round_counter = dict(zip(unique, count))
# print("Round ", r, round_counter)
for a in range(len(self.actions)):
if self.actions[a] not in round_counter:
pass pass
else:
results[iter, r, a] = round_counter[self.actions[a]]
return results
def playM(self):
"""
Play an iteration of N/M games between M players, of R rounds
"""
iteration = 1
results = []
last_counter = []
same_results = 0
# if self.graph_based:
# playersNo = self.graphSelect()
while (iteration <= self.iterations) & (same_results < 50 or True): # iteration starts
"""ITERATION"""
iteration_counter = []
for player in self.players: # reset initial wealth
player.resetWealth()
playersNo = self.selectPlayers()
print(playersNo)
"""GAME"""
for m_players in playersNo: # for each set of m players -- a game
game_counter = [] # STATS: list of the round counters
print("A new game starts, among players:", m_players, "\nContribution initialised")
contributions = {} # accumulated contributions of each round
"""ROUND"""
for r in range(self.R): # for each round
round_counter = {} # STATS: counting the number of each actions
for action in self.actions:
round_counter[action] = 0
ratio = {}
print("RRRRRRRRRRRRRRRRound", r)
"""PLAYER'S TURN"""
for m in m_players: # for each player
print("Player", m, "is playing:")
ratio[m] = self.players[m].chooseAction(r) # Choose Action (a ratio)
round_counter[ratio[m]] += 1
currentWealth = self.players[m].getWealth()
if m not in contributions:
contributions[m] = 0
print("Ratio:", ratio[m], "current wealth before:", currentWealth)
contributions[m] += ratio[m] * currentWealth
print("Contribute: ", ratio[m] * currentWealth)
"""PLAYER'S TURN END"""
print("All players contributed, sum:", sum(contributions.values()), "total wealth:", self.totalWealth)
risk = self.computeRisk(sum(contributions.values()), self.totalWealth)
print("risk:", risk)
for m in m_players:
if np.random.uniform(0,1) < risk: # "<" since np.random.uniform is [0, 1)
print("XXXXXXXX Tragedy happened to Player ", m, " losing " ,self.lossfrac(), "of wealth")
loss = self.lossfrac()
else:
print("NOTHING HAPPEND TO PLAYER",m)
loss = 0
self.players[m].updateReward(r, ratio[m], loss)
print("R----------Round finished, round counter: ", round_counter)
game_counter.append(round_counter)
"""ROUND END"""
print("G======Game finished. game counter:", game_counter)
if not iteration_counter:
iteration_counter = copy.deepcopy(game_counter)
else:
for r in range(self.R):
iteration_counter[r] = utilis.combine_dict(iteration_counter[r], game_counter[r])
"""GAME END"""
print("I~~~~~Iteration ", iteration, " finished. Iteration Counter:")
print(iteration_counter)
results.append(iteration_counter)
iteration += 1
if last_counter:
if last_counter == iteration_counter:
same_results += 1
else:
same_results = 0
last_counter = copy.deepcopy(iteration_counter)
"""ITERATION END"""
print("GAME FINISHED. RESULTS:")
for i in range(len(results)):
print("iteration", i+1, results[i])
def stackBar(data, r): # Plotting the data for round r
A = len(Actions)
p = []
mean = np.zeros((A, I)) # of each action in each iter
ind = np.arange(I)
width = 0.3
for iter in range(I):
for a in range(A):
mean[a, iter] = data[iter, r, a]
base = 0
for a in range(A):
p.append(plt.bar(ind, mean[a], width, bottom=base))
base += mean[a]
plt.ylabel('Number of Actions')
plt.xlabel('Time(iterations)')
plt.title('Average Number of Actions in Round ' + str(r+1))
# plt.xticks(ind, ('G1', 'G2', 'G3', 'G4', 'G5'))
# plt.yticks(np.arange(0, 81, 10))
plt.legend(tuple([p[x][0] for x in range(A)][::-1]), tuple(Actions[::-1]))
plt.show()
def stackPlot(data, r, k, p):
A = len(Actions)
x = range(I)
y = np.zeros((I, A))
for i in range(I):
y[i] = data[i][r]
y = np.vstack(y.T)
fig, ax = plt.subplots()
# grays = np.arange(0, 1, (max(Actions) - min(Actions))/A)
ax.stackplot(x, y, labels=Actions, colors=[str(1 - x) for x in Actions])
ax.legend(loc='lower right')
plt.ylabel('Number of Actions')
plt.xlabel('Time(iterations)')
plt.title('Average Number of Actions in Round ' + str(r+1) +
'\n(k=' + str(k) + ', p=' + str(p) + ')')
plt.show()
def rep(rept, K, P, r=0, graph = None):
data = np.zeros((I, R, len(Actions)))
for re in range(rept):
print("REP", re)
g = game(K=K, P=P)
result = g.play2()
data += result
data /= rept
print(data)
if graph == "stackBar":
stackBar(data, 0)
elif graph == "stackPlot":
if r == -1:
for i in range(R):
stackPlot(data, i, K, P)
else:
stackPlot(data, r, K, P)
# Taking the mean of the last 100 iterations --- need to justify
sum = 0
for i in range(-1, -101, -1):
sum += np.sum(result[i, r] * Actions)
return sum/100
def graph_kp3d(Klist=[2, 4, 8, 10], Plist=[0.2, 0.4, 0.6, 0.8], repet=30):
K = Klist
P = Plist
meanA = np.zeros((len(K), len(P)))
for k in range(len(K)):
for p in range(len(P)):
meanA[k][p] = rep(repet, K[k], P[p])
P, K = np.meshgrid(P, K)
fig = plt.figure()
ax = fig.gca(projection='3d')
surf = ax.plot_surface(P, K, meanA, cmap=cm.coolwarm,
linewidth=0, antialiased=False)
fig.colorbar(surf, shrink=0.5, aspect=5)
plt.show()
def main():
graph_kp3d()
# rep(50, 99, 0, graph="stackPlot")
if __name__ == '__main__':
main()
\ No newline at end of file
graph.py 0 → 100644
"""
This file contains a graph class, which is used to represent social connections
among social dilemma games. Each of the N nodes (players) has K edges
(connections) before a rewiring process with probability P that each edge may
rewire randomly. If K == N - 1, it is a well-mixed graph and P doesn't matter.
After the graph is set, edges can be drawn to represent a game between two
players, for all players. It is likely that a player is drawn twice in one
selection so that all players are drawn at least once.
Author: Liyao Zhu liyaoz@student.unimelb.edu.au
Date: Apr. 2019
"""
import numpy as np
class Graph:
def __init__(self, N, K, P):
self.N = N # Number of players
self.K = K # Number of edges/connections each player has originally
self.P = P # Rewiring probability
self.edges = []
self.selectedNodes = {}
if K == N - 1:
"""Well-mixed graph, no rewiring"""
for i in range(N):
for j in range(i + 1, N):
self.edges.append((i, j))
elif K < N - 1:
assert K % 2 == 0
k_half = int(K/2)
"""Create the original graph (equal to p = 0)"""
for i in range(N):
for j in range(1, k_half + 1):
self.edges.append((i, (i + j) % N))
"""Randomly rewire each edge with prob p, start from distance 1"""
for j in range(1, k_half + 1):
for i in range(N):
if P > np.random.uniform(0, 1):
new_set = [v for v in range(N) if v != i and (i, v) not
in self.edges and (v, i) not in self.edges]
if len(new_set) > 0:
new = np.random.choice(new_set)
self.edges.append((i, new))
old = (i + j) % self.N
self.edges.remove((i, old))
# print("Rewiring (", i, old, ") to: (", i, new)
else:
print("ERROR: Illegal K or N value.")
exit(3)
def select(self):
"""
Randomly select edges from the graph, so that each player is drawn at
least once.
:return: A list of tuples, containing players' index
"""
edges = self.edges
nodes = list(range(self.N))
select = []
selectedNodes = {i: 0 for i in range(self.N)}
while edges: # Loop when edges is not empty
i, j = edges[np.random.randint(0, len(edges))]
# print("selected nodes:", i, j)
select.append((i, j))
nodes.remove(i)
nodes.remove(j)
selectedNodes[i] += 1
selectedNodes[j] += 1
# print("Remaining nodes:", nodes)
edges = [(a, b) for (a, b) in edges if (a != i) and (a != j)
and (b != i) and (b != j)]
# print("after removal", edges)
while nodes:
v = nodes.pop(np.random.randint(0, len(nodes)))
v_edges = [(i, j) for (i, j) in self.edges if i == v or j == v]
i, j = v_edges[np.random.randint(len(v_edges))]
select.append((i, j))
selectedNodes[i] += 1
selectedNodes[j] += 1
# print("Number of each nodes selected:", selectedNodes)
self.selectedNodes = selectedNodes
return select
def getNodesNumber(self):
"""
:return: A dictionary specify how many times each player are drawn from
the last select()
"""
return self.selectedNodes
def getEdgeList(self):
return self.edges
\ No newline at end of file
...@@ -11,3 +11,8 @@ def read(): ...@@ -11,3 +11,8 @@ def read():
map = {'N':100, 'M':2} map = {'N':100, 'M':2}
return map return map
def combine_dict(dict1, dict2):
return {k: (dict1[k] + dict2[k]) for k in dict1}
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment