import random import cv2 as cv import numpy as np import pydirectinput from window_capture import WindowCapture from vision import Vision from config_file import UserConfigs from utils import mse from copy import copy from game_base_class import GameBase class Pickaxe_Field(GameBase): data_value_grid = [] data_coordinates = [] episode_step = 0 SIZE = 880 #RETURN_IMAGES = True #MOVE_PENALTY = 1 #ENEMY_PENALTY = 300 #FOOD_REWARD = 25 OBSERVATION_SPACE_VALUES = (SIZE, SIZE, 3) # 4 ACTION_SPACE_SIZE = 4 #PLAYER_N = 1 # player key in dict #FOOD_N = 2 # food key in dict #+ENEMY_N = 3 # enemy key in dict # the dict! (colors) #d = {1: (255, 175, 0), # 2: (0, 255, 0), # 3: (0, 0, 255)} observation = None last_score = 0 last_reward = 0 kill_counter = 0 last_action = 0 MOVE_RIGHT = 0 MOVE_LEFT = 1 MOVE_DOWN = 3 MOVE_UP = 2 SQUARE_DIM = 250 def __init__(self, overlay): super().__init__(overlay) self.data_value_grid = np.zeros((4, 4), dtype=int) self.data_coordinates = np.zeros((4, 4), dtype=object) self.data_coordinates[0][0] = [0, 0, self.SQUARE_DIM, self.SQUARE_DIM] self.data_coordinates[0][1] = [self.SQUARE_DIM, 0, self.SQUARE_DIM, self.SQUARE_DIM] self.data_coordinates[0][2] = [self.SQUARE_DIM * 2, 0, self.SQUARE_DIM, self.SQUARE_DIM] self.data_coordinates[0][3] = [self.SQUARE_DIM * 3, 0, self.SQUARE_DIM, self.SQUARE_DIM] self.data_coordinates[1][0] = [0, self.SQUARE_DIM, self.SQUARE_DIM, self.SQUARE_DIM] self.data_coordinates[1][1] = [self.SQUARE_DIM, self.SQUARE_DIM, self.SQUARE_DIM, self.SQUARE_DIM] self.data_coordinates[1][2] = [self.SQUARE_DIM * 2, self.SQUARE_DIM, self.SQUARE_DIM, self.SQUARE_DIM] self.data_coordinates[1][3] = [self.SQUARE_DIM * 3, self.SQUARE_DIM, self.SQUARE_DIM, self.SQUARE_DIM] self.data_coordinates[2][0] = [0, self.SQUARE_DIM * 2, self.SQUARE_DIM, self.SQUARE_DIM] self.data_coordinates[2][1] = [self.SQUARE_DIM, self.SQUARE_DIM * 2, self.SQUARE_DIM, self.SQUARE_DIM] self.data_coordinates[2][2] = [self.SQUARE_DIM * 2, self.SQUARE_DIM * 2, self.SQUARE_DIM, self.SQUARE_DIM] self.data_coordinates[2][3] = [self.SQUARE_DIM * 3, self.SQUARE_DIM * 2, self.SQUARE_DIM, self.SQUARE_DIM] self.data_coordinates[3][0] = [0, self.SQUARE_DIM * 3, self.SQUARE_DIM, self.SQUARE_DIM] self.data_coordinates[3][1] = [self.SQUARE_DIM, self.SQUARE_DIM * 3, self.SQUARE_DIM, self.SQUARE_DIM] self.data_coordinates[3][2] = [self.SQUARE_DIM * 2, self.SQUARE_DIM * 3, self.SQUARE_DIM, self.SQUARE_DIM] self.data_coordinates[3][3] = [self.SQUARE_DIM * 3, self.SQUARE_DIM * 3, self.SQUARE_DIM, self.SQUARE_DIM] # initialize the user-class self.config = UserConfigs() # initialize the StunWindowCapture class self.capture_window = WindowCapture(None, None, self.config) # initialize the StunVision class self.vision_stun = Vision() self.needles = {1: cv.imread("pickaxe/1.png", cv.IMREAD_COLOR), 2: cv.imread("pickaxe/2.png", cv.IMREAD_COLOR), 3: cv.imread("pickaxe/3.png", cv.IMREAD_COLOR), 4: cv.imread("pickaxe/4.png", cv.IMREAD_COLOR), 5: cv.imread("pickaxe/5.png", cv.IMREAD_COLOR), 6: cv.imread("pickaxe/6.png", cv.IMREAD_COLOR), 7: cv.imread("pickaxe/7.png", cv.IMREAD_COLOR), 8: cv.imread("pickaxe/8.png", cv.IMREAD_COLOR), 9: cv.imread("pickaxe/9.png", cv.IMREAD_COLOR), 10: cv.imread("pickaxe/10.png", cv.IMREAD_COLOR), 11: cv.imread("pickaxe/11.png", cv.IMREAD_COLOR), 12: cv.imread("pickaxe/12.png", cv.IMREAD_COLOR) } def reset(self): self.episode_step = 0 self.last_reward = 0 self.last_score = 0 self.kill_counter = 0 # hit reset button and confirm #self.check_for_button_and_click_it("needles/repeat.jpg") #self.check_for_button_and_click_it("needles/reset.jpg") self.dig_point(1800, 160, 600) self.dig_point(1800, 1000, 300) self.observation, screen = self.get_current_board_state() return self.observation def shift_playfield(self, action): self.episode_step += 1 # move to indicated direction self.action(action) # get new field status new_observation, new_screenshot = self.get_current_board_state() current_score = 0 reward = 0 # wrong movement detection # last board state is same as actual if mse(new_observation, self.observation) == 0.0: # no movement detected -> punish if len(new_observation[new_observation == 0]) >= 1: reward = -100 else: self.kill_counter = self.kill_counter + 1 reward = -5 else: # calculate current board score self.kill_counter = 0 for e in range(0, 4, 1): for i in range(0, 4, 1): current_score = current_score + (2 ** new_observation[e][i] - 1) bonus_for_empty_cells = len(new_observation[new_observation == 0]) reward = current_score - self.last_score + bonus_for_empty_cells self.last_score = current_score if self.kill_counter >= 5: self.kill_counter = 0 done = True else: done = False self.observation = new_observation return new_observation, reward, done def predict_best_move(self): self.observation, new_screenshot = self.get_current_board_state() lst = self.shift_possible() tmp_dic = {} for direction in lst: if direction == 0: tmp_dic[direction] = self.shift_rule_checker(self.shif_right()) elif direction == 1: tmp_dic[direction] = self.shift_rule_checker(self.shift_left()) elif direction == 2: tmp_dic[direction] = self.shift_rule_checker(self.shift_up()) elif direction == 3: tmp_dic[direction] =self.shift_rule_checker(self.shift_down()) return max(tmp_dic, key=tmp_dic.get) def shift_rule_checker(self, shift_dir): #1 is highest axe top right #2 is second highest below #1 and level - 1 #3 is third highest below #2 and level - 1 #4 is fourth highest below #3 and level -1 #5 merge if secone row bottom matches #4 #6 rightest column has no empty spots (enable downwards movement) points = 0 highest = np.max(shift_dir) if shift_dir[0][3] == highest: points = points + 1000 else: points = points - 10000 second_higest = np.unique(shift_dir)[-2] if shift_dir[1][3] == second_higest: points = points + 300 if shift_dir[2][3] is not 0: if shift_dir[2][3] == np.unique(shift_dir)[-3]: #third_higest = np.unique(shift_dir)[-3] #if shift_dir[2][3] == third_higest: points = points + 300 if shift_dir[3][3] is not 0: if shift_dir[3][3] == np.unique(shift_dir)[-4]: #fourth_higest = np.unique(shift_dir)[-4] #if shift_dir[3][3] == fourth_higest: points = points + 300 if shift_dir[1][3] == highest - 1: points = points + 100 if shift_dir[2][3] == highest - 2: points = points + 100 if shift_dir[3][3] == highest - 3: points = points + 100 if shift_dir[3][3] == 0: points = points - 500 if shift_dir[2][3] == 0: points = points - 500 if shift_dir[1][3] == 0: points = points - 500 if shift_dir[3][3] - 1 == shift_dir[3][2]: points = points + 200 if shift_dir[3][3] - 1 == shift_dir[2][2]: points = points + 200 if shift_dir[3][3] == shift_dir[3][2]: points = points + 300 return points def shif_right(self): merge_observation = copy(self.observation) for e in range(0, 4, 1): for i in range(0, 4, 1): if (i + 1) <= 3 and self.observation[e][i] is not 0: if self.observation[e][i] == self.observation[e][i + 1]: #right merge merge_observation [e][i + 1] = self.observation[e][i] + 1 merge_observation [e][i] = 0 if (i + 2) <= 3 and self.observation[e][i] is not 0: if self.observation[e][i] == self.observation[e][i + 2] and self.observation[e][i + 1] == 0: #right merge merge_observation [e][i + 2] = self.observation[e][i] + 1 merge_observation [e][i] = 0 if (i + 3) <= 3 and self.observation[e][i] is not 0: if self.observation[e][i] == self.observation[e][i + 3] and self.observation[e][i + 1] == 0 and self.observation[e][i + 2] == 0: #right merge merge_observation [e][i + 3] = self.observation[e][i] + 1 merge_observation [e][i] = 0 shift_observation = merge_observation while True: remember_observation = copy(shift_observation) for e in range(0, 4, 1): for i in range(0, 4, 1): if (i + 1) <= 3 and shift_observation[e][i] is not 0: if shift_observation[e][i + 1] is 0: shift_observation[e][i + 1] = shift_observation[e][i] shift_observation[e][i] = 0 if mse(remember_observation, shift_observation) == 0.0: break return shift_observation def shift_left(self): merge_observation = copy(self.observation) for e in range(0, 4, 1): for i in range(0, 4, 1): if (i - 1) <= 3 and self.observation[e][i] is not 0: if self.observation[e][i] == self.observation[e][i - 1]: #left merge merge_observation [e][i - 1] = self.observation[e][i] + 1 merge_observation [e][i] = 0 if (i - 2) <= 3 and self.observation[e][i] is not 0: if self.observation[e][i] == self.observation[e][i - 2] and self.observation[e][i - 1] == 0: #left merge merge_observation [e][i - 2] = self.observation[e][i] + 1 merge_observation [e][i] = 0 if (i - 3) <= 3 and self.observation[e][i] is not 0: if self.observation[e][i] == self.observation[e][i - 3] and self.observation[e][i - 1] == 0 and self.observation[e][i - 2] == 0: #left merge merge_observation [e][i - 3] = self.observation[e][i] + 1 merge_observation [e][i] = 0 shift_observation = copy(merge_observation) while True: remember_observation = copy(shift_observation) for e in range(0, 4, 1): for i in range(0, 4, 1): if (i - 1) >= 0 and shift_observation[e][i] is not 0: if shift_observation[e][i - 1] is 0: shift_observation[e][i - 1] = shift_observation[e][i] shift_observation[e][i] = 0 if mse(remember_observation, shift_observation) == 0.0: break return shift_observation def shift_up(self): merge_observation = copy(self.observation) for e in range(0, 4, 1): for i in range(0, 4, 1): if (e - 1) <= 3 and self.observation[e][i] is not 0: if self.observation[e][i] == self.observation[e - 1][i]: #up merge merge_observation [e - 1][i] = self.observation[e][i] + 1 merge_observation [e][i] = 0 if (e - 2) <= 3 and self.observation[e][i] is not 0: if self.observation[e][i] == self.observation[e - 2][i] and self.observation[e - 1][i] == 0: #up merge merge_observation [e - 2][i] = self.observation[e][i] + 1 merge_observation [e][i] = 0 if (e - 3) <= 3 and self.observation[e][i] is not 0: if self.observation[e][i] == self.observation[e - 3][i] and self.observation[e - 1][i] == 0 and self.observation[e - 2][i] == 0: #up merge merge_observation [e - 3][i] = self.observation[e][i] + 1 merge_observation [e][i] = 0 shift_observation = copy(merge_observation) while True: remember_observation = copy(shift_observation) for e in range(0, 4, 1): for i in range(0, 4, 1): if (e - 1) >= 0 and shift_observation[e][i] is not 0: if shift_observation[e - 1][i] is 0: shift_observation[e - 1][i] = shift_observation[e][i] shift_observation[e][i] = 0 if mse(remember_observation, shift_observation) == 0.0: break return shift_observation def shift_down(self): merge_observation = copy(self.observation) for e in range(0, 4, 1): for i in range(0, 4, 1): if (e + 1) <= 3 and self.observation[e][i] is not 0: if self.observation[e][i] == self.observation[e + 1][i]: #down merge merge_observation [e + 1][i] = self.observation[e][i] + 1 merge_observation [e][i] = 0 if (e + 2) <= 3 and self.observation[e][i] is not 0: if self.observation[e][i] == self.observation[e + 2][i] and self.observation[e + 1][i] == 0: #down merge merge_observation [e + 2][i] = self.observation[e][i] + 1 merge_observation [e][i] = 0 if (e + 3) <= 3 and self.observation[e][i] is not 0: if self.observation[e][i] == self.observation[e + 3][i] and self.observation[e + 1][i] == 0 and self.observation[e + 2][i] == 0: #down merge merge_observation [e + 3][i] = self.observation[e][i] + 1 merge_observation [e][i] = 0 shift_observation = copy(merge_observation) while True: remember_observation = copy(shift_observation) for e in range(0, 4, 1): for i in range(0, 4, 1): if (e + 1) <= 3 and shift_observation[e][i] is not 0: if shift_observation[e + 1][i] is 0: shift_observation[e + 1][i] = shift_observation[e][i] shift_observation[e][i] = 0 if mse(remember_observation, shift_observation) == 0.0: break return shift_observation def shift_possible(self): directions = [] for e in range(0, 4, 1): for i in range(0, 4, 1): if (i + 1) <= 3 and self.observation[e][i] is not 0: if self.observation[e][i] == self.observation[e][i + 1] or self.observation[e][i + 1] is 0: #right possible directions.append(self.MOVE_RIGHT) if (i - 1) >= 0 and self.observation[e][i] is not 0: if self.observation[e][i] == self.observation[e][i - 1] or self.observation[e][i - 1] is 0: # left possible directions.append(self.MOVE_LEFT) if (e + 1) <= 3 and self.observation[e][i] is not 0: if self.observation[e][i] == self.observation[e + 1][i] or self.observation[e + 1][i] is 0: # down possible directions.append(self.MOVE_DOWN) if (e - 1) >= 0 and self.observation[e][i] is not 0: if self.observation[e][i] == self.observation[e - 1][i] or self.observation[e - 1][i] is 0: # up possible directions.append(self.MOVE_UP) return list(set(directions)) def action(self, choice): ''' Gives us 4 total movement options. (0,1,2,3) ''' if choice == 0: # right self.move_to(1200, 598) elif choice == 1: # left self.move_to(1000, 598) elif choice == 2: # up self.move_to(1113, 498) elif choice == 3: # down self.move_to(1113, 698) def move_to(self, x, y ): point_src = (1113, 598) pydirectinput.moveTo(point_src[0], point_src[1]) pydirectinput.mouseDown() w = random.randint(1, 100) cv.waitKey(150 + w) pydirectinput.moveTo(x, y) pydirectinput.mouseUp() cv.waitKey(500 + w) def change_value(self, x, y, val): self.data_value_grid[x][y] = val def pointInRect(self, point): for e in range(0, 4, 1): for i in range(0, 4, 1): x1, y1, w, h = self.data_coordinates[e][i] x2, y2 = x1+w, y1+h x, y = point if (x1 < x and x < x2): if (y1 < y and y < y2): return e, i return None, None def assess_playfield_and_make_move(self): #if self.check_for_button_and_execute(self.capture_window.get_screenshot(), self.sd_reset_board): # cv.waitKey(2000) action_direction = self.predict_best_move() self.shift_playfield(action_direction) def get_current_board_state(self): # get an updated image of the game screenshot = self.capture_window.get_screenshot() #screenshot = cv.imread("playfield_pic3.jpg") screenshot = screenshot[200:1200, 650:1650] # 1000,1000 # cv.imshow("screenshot", screenshot) # cv.waitKey(150) # continue data_coords = np.zeros((4, 4), dtype=object) #field = Pickaxe_Field() for needle_key in self.needles.keys(): #gray_needle = cv.cvtColor(self.needles[needle_key], cv.COLOR_BGR2GRAY) #thresh_needle = cv.threshold(gray_needle, 0, 255, cv.THRESH_BINARY_INV + cv.THRESH_OTSU)[1] rectangles = self.vision_stun.find(screenshot, self.needles[needle_key], 0.8, 12) if len(rectangles) == 0: continue points = self.vision_stun.get_click_points(rectangles) for point in points: x, y = self.pointInRect(point) if x is not None and y is not None: data_coords[x][y] = int(needle_key) #self.change_value(x, y, int(needle_key)) # print(field.data_value_grid) # cv.circle(screenshot, points[0], 7, (0, 255, 0), -1) # output_image = vision_stun.draw_rectangles(screenshot, rectangles) # cv.imshow("output_image", output_image) # cv.waitKey(150) return data_coords, screenshot def check_for_button_and_click_it(self, button_url): screenshot = self.capture_window.get_screenshot() #gray = cv.cvtColor(screenshot, cv.COLOR_BGR2GRAY) #thresh = cv.threshold(gray, 0, 255, cv.THRESH_BINARY_INV + cv.THRESH_OTSU)[1] #gray_needle = cv.cvtColor(cv.imread(button_url, cv.IMREAD_UNCHANGED), cv.COLOR_BGR2GRAY) #thresh_needle = cv.threshold(gray_needle, 0, 255, cv.THRESH_BINARY_INV + cv.THRESH_OTSU)[1] needle = cv.imread(button_url, cv.IMREAD_UNCHANGED) #rectangles = self.vision_stun.find(thresh, thresh_needle, 0.4, 1) rectangles = self.vision_stun.find(screenshot, needle, 0.7, 1) if len(rectangles) == 1: pointis = self.vision_stun.get_click_points(rectangles) for pointi in pointis: self.dig_point(pointi[0], pointi[1], 150) def dig_point(self, point1, point2, dig_time): pydirectinput.moveTo(point1, point2) cv.waitKey(dig_time) pydirectinput.mouseDown() w = random.randint(50, 100) cv.waitKey(w) pydirectinput.mouseUp()