Module sverchok.utils.wfc_algorithm

Initial code was taken from Houdini software implementation https://github.com/sideeffects/SideFXLabs

Expand source code
"""
Initial code was taken from Houdini software implementation
https://github.com/sideeffects/SideFXLabs
"""

from itertools import chain

import numpy as np


class WaveFunctionCollapse:
    # wave = WaveFunctionCollapse(*params)  this step will read input image and create patterns
    # new_image = wave.solve(*params)  this step will generate output image

    def __init__(
            self,
            image,
            patter_size=3,
            periodic_input=True,
            rotate_patterns=True):

        self.input_grid_size = image.shape
        self.input_sample_image = image.reshape(-1, 4).tolist()
        self.periodic_input = periodic_input
        self.add_rotations = rotate_patterns
        self.pattern_size = patter_size

        self.output_grid_size = None
        self.seed = None
        self.tile_around_bounds = None

        self.patterns = []
        self.patterns_transforms = []
        self.pattern_frequencies = []
        self.number_of_unique_patterns = None
        self.output_grid = {}
        self.entropy_grid = {}
        self.solve_starting_point_index = None
        self.allowed_pattern_adjacencies = {}
        self.nbr_directions = ((-1, 0), (1, 0), (0, -1), (0, 1))
        self.respect_user_constraints = False
        self.use_input_pattern_frequency = 1

        self.create_patterns_from_input()

    def solve(self,
              output_size=(10, 10),
              seed=0,
              tiling_output=False,
              max_number_contradiction_tries=1):

        self.output_grid_size = output_size
        self.tile_around_bounds = tiling_output

        for solve_attempt in range(max_number_contradiction_tries):

            # Set the seed for our random picking of values
            np.random.seed(seed + solve_attempt * 100)

            # Initialize WFC process
            self.initialize_grid()
            self.initialize_entropy_grid()
            self.calculate_adjacencies()

            if self.respect_user_constraints:
                success = ForceUserConstraints()  # is this need?
            else:
                success = True

            if success:
                # Run the WFC solve itself
                success = self.run_wfc_solve()

            # Assign the output values
            if success:
                return self.assign_wave_to_output()

        # If we have exceeded the number of retries for the solve,
        # we will throw an error to tell the user no solution has been found
        raise RuntimeError("Looks like solution for current input parameters can't be found. "
                           "Try to change seed or number of contradiction tries")

    def create_patterns_from_input(self):
        # Here we will take the input grid, extract its values (name attribute), and cut it up into NxN size patterns
        # Kernel to store data in
        search_kernel = tuple(
            tuple(i + n * self.input_grid_size[0] for i in range(self.pattern_size)) for n in range(self.pattern_size))

        all_temp_patterns = []
        all_temp_patterns_transforms = []

        if self.periodic_input:
            offset = 0
        else:
            offset = (self.pattern_size - 1)

        # Loop over grid
        for y in range(self.input_grid_size[1] - offset):
            for x in range(self.input_grid_size[0] - offset):

                matrix_as_list = []
                for item in search_kernel:
                    tmp = []
                    for sub_items in item:
                        list_index = int(((x + sub_items) % self.input_grid_size[0]) + (
                                        ((item[0] + self.input_grid_size[0] * y) / self.input_grid_size[0])
                                        % self.input_grid_size[1]) * self.input_grid_size[0])
                        tmp.append(self.input_sample_image[list_index])

                    # This is where variations would take place

                    matrix_as_list.append(tuple(tmp))

                temp_pattern = tuple(matrix_as_list)

                if not self.add_rotations:
                    all_temp_patterns.append(temp_pattern)
                    all_temp_patterns_transforms.append([0, 1, 1])
                else:
                    for x in range(4):
                        temp_pattern = list(zip(*temp_pattern[::-1]))
                        all_temp_patterns.append(temp_pattern)
                        all_temp_patterns_transforms.append([(x + 1) * 90, 1, 1])
                        # Maybe add horizontal flip too??
                        # all_temp_patterns.append([a[::-1] for a in temp_pattern]) # Flip X
                        # all_temp_patterns_transforms.append([(x+1)*90, -1, 1])

        all_temp_patterns = [tuple(chain.from_iterable(p)) for p in all_temp_patterns]

        self.patterns = []
        self.patterns_transforms = []
        self.pattern_frequencies = []

        for i, pattern in enumerate(all_temp_patterns):
            if pattern not in self.patterns:
                self.patterns.append(pattern)
                self.pattern_frequencies.append(1)
                self.patterns_transforms.append(all_temp_patterns_transforms[i])
            else:
                index = self.patterns.index(pattern)
                self.pattern_frequencies[index] += 1

        self.number_of_unique_patterns = len(self.pattern_frequencies)

    def initialize_grid(self):
        # Here we create a list that will be used as our output grid. (Used for solving in)

        for x in range(self.output_grid_size[0] * self.output_grid_size[1]):
            self.output_grid[x] = set(range(self.number_of_unique_patterns))

    def initialize_entropy_grid(self):
        # Here we create grid that matches the output grid, but we store entropy values instead.
        # (Entropy = Number of remaining legal patterns)

        for x in range(self.output_grid_size[0] * self.output_grid_size[1]):
            self.entropy_grid[x] = self.number_of_unique_patterns

        # Pick starting point for solve. (Random if not specified)
        if self.solve_starting_point_index is None:
            # solve_starting_point_index = np.random.randint(NumberOfUniquePatterns)
            self.solve_starting_point_index = np.random.randint(len(self.entropy_grid.keys()))

        self.entropy_grid[self.solve_starting_point_index] = self.number_of_unique_patterns - 1

    def calculate_adjacencies(self):
        # If PatternIndex = 10 has been observed to be to the left of of PatternIndex = 15 in the InputGrid:
        # AllowedPatternAdjacencies[PatternIndex=15][0].add(PatternIndex=10)
        # Directions: 0 = left, 1 = right, 2 = up, 3 = down
        # OUTPUT EXAMPLE PatternIndex = 15 --> (LEFT: set([65, 36, 69, 44, 87, 56, 29]),
        # RIGHT: set([8]), UP: set([32, 64, 10, 12, 14, 83, 21]), DOWN: set([15]))

        # Initialize empty AllowedPatternAdjacencies
        for x in range(self.number_of_unique_patterns):
            self.allowed_pattern_adjacencies[x] = tuple(set() for _ in range(len(self.nbr_directions)))

        # Comparing patterns to each other
        for PatternIndex1 in range(self.number_of_unique_patterns):
            for PatternIndex2 in range(self.number_of_unique_patterns):

                pattern1_boundary_columns = [n for i, n in enumerate(self.patterns[PatternIndex1]) if
                                             i % self.pattern_size != (self.pattern_size - 1)]
                pattern2_boundary_columns = [n for i, n in enumerate(self.patterns[PatternIndex2]) if
                                             i % self.pattern_size != 0]

                # Compare Columns compatibility
                if pattern1_boundary_columns == pattern2_boundary_columns:
                    self.allowed_pattern_adjacencies[PatternIndex1][0].add(PatternIndex2)
                    self.allowed_pattern_adjacencies[PatternIndex2][1].add(PatternIndex1)

                pattern1_boundary_rows = self.patterns[PatternIndex1][
                                       :(self.pattern_size * self.pattern_size) - self.pattern_size]
                pattern2_boundary_rows = self.patterns[PatternIndex2][self.pattern_size:]

                if pattern1_boundary_rows == pattern2_boundary_rows:
                    self.allowed_pattern_adjacencies[PatternIndex1][2].add(PatternIndex2)
                    self.allowed_pattern_adjacencies[PatternIndex2][3].add(PatternIndex1)

    def run_wfc_solve(self):
        # This runs the actual WFC solve
        running = True
        while running:

            # Find the cell with the lowest entropy value, and assign a random valid PatternIndex
            lowest_entropy_cell = self.get_lowest_entropy_cell()
            pattern_index_for_cell = self.get_random_allowed_pattern_index_from_cell(lowest_entropy_cell)
            self.assign_pattern_to_cell(lowest_entropy_cell, pattern_index_for_cell)

            # Propagate the OutputGrid after collapsing the LowestEntropyCell
            running, error = self.propagate_grid_cells(lowest_entropy_cell)

            # Check if all cells in the OutputGrid have collapsed yet (done solving)
            if len(self.entropy_grid.keys()) == 0:
                running = False

        # Check if we have ran into an error. (contradiction while propagating)
        if error == 1:
            return False
        else:
            return True

    def get_lowest_entropy_cell(self):
        # Find the list entry in the entropy grid with the lowest value
        return min(self.entropy_grid, key=self.entropy_grid.get)

    def get_random_allowed_pattern_index_from_cell(self, cell):
        # Assign a random allowed pattern_index to given cell.
        # This can either use frequency of found patterns as a weighted random or not depending on user parm
        if self.use_input_pattern_frequency == 1:
            return np.random.choice(
                [pattern_index for pattern_index in self.output_grid[cell]
                 for i in range(self.pattern_frequencies[pattern_index])])
        else:
            return np.random.choice([pattern_index for pattern_index in self.output_grid[cell]])

    def assign_pattern_to_cell(self, cell, pattern_index):
        # Assign given cell a chosen PatternIndex, and delete cell from EntropyGrid (Cell has collapsed)
        self.output_grid[cell] = {pattern_index}
        del self.entropy_grid[cell]

    def propagate_grid_cells(self, cell):
        # This propagates all the cells that should have been affected from the just-collapsed cell

        # We are using a stack to add newly found to-be-updated cells to
        to_update_stack = {cell}
        while len(to_update_stack) != 0:
            cell_index = to_update_stack.pop()

            # loop through neighbor cells of currently propagated cell
            for direction, transform in enumerate(self.nbr_directions):

                neighbor_index_is_valid = True

                x = int((cell_index % self.output_grid_size[0] + transform[0]) % self.output_grid_size[0])
                y = int((cell_index / self.output_grid_size[0] + transform[1]) % self.output_grid_size[1])
                neighbor_cell_index = x + y * self.output_grid_size[0]  # index of neighboring cell

                # If the user does not want the WFC solve to create a tiling output,
                # we just state that the found neighbor cell is invalid and don't propagate it
                if not self.tile_around_bounds:
                    x_is_wrapping = abs(neighbor_cell_index % self.output_grid_size[0]
                                        - cell_index % self.output_grid_size[0]) > 1
                    y_is_wrapping = abs(neighbor_cell_index / self.output_grid_size[1]
                                        - cell_index / self.output_grid_size[1]) > 1

                    if x_is_wrapping or y_is_wrapping:
                        neighbor_index_is_valid = False

                # Cell has not yet been collapsed yet
                if neighbor_cell_index in self.entropy_grid and neighbor_index_is_valid:

                    # These are all the allowed patterns for the direction of the checked neighbor cell
                    pattern_indices_in_cell = {n for pattern_index in self.output_grid[cell_index] for n in
                                               self.allowed_pattern_adjacencies[pattern_index][direction]}

                    # These are all the patterns the neighbor allows itself
                    pattern_indices_in_neighbor_cell = self.output_grid[neighbor_cell_index]

                    # Make sure we need to update the cell
                    # by checking if the currently pattern_indices_in_cell patterns for the neighbor cells
                    # are already fully contained in the now reduced set of patterns
                    if not pattern_indices_in_neighbor_cell.issubset(pattern_indices_in_cell):

                        shared_cell_and_neighbor_pattern_indices = set(
                            [x for x in pattern_indices_in_cell if x in pattern_indices_in_neighbor_cell])

                        if len(shared_cell_and_neighbor_pattern_indices) == 0:
                            return False, 1

                        self.output_grid[neighbor_cell_index] = shared_cell_and_neighbor_pattern_indices
                        self.entropy_grid[neighbor_cell_index] = len(self.output_grid[neighbor_cell_index])
                        to_update_stack.add(neighbor_cell_index)

        return True, 0

    def assign_wave_to_output(self):
        # This finds and assigns the picked PatternIndex to the output grid as attributes
        flat_out_image = []
        for x in self.output_grid.keys():
            val = next(iter(self.output_grid[x]))
            flat_out_image.append(self.patterns[val][0])

        out_image = []
        for i_row in range(self.output_grid_size[1]):
            current_row_position = i_row * self.output_grid_size[0]
            pixel_row = flat_out_image[current_row_position: current_row_position + self.output_grid_size[0]]
            out_image.append(pixel_row)

        return out_image


# def ForceUserConstraints():
    # This function handles assigning user constraints based on attached attribute values to the output grid
#     for cellindex, cellvalue in enumerate(UserConstraintAttributes):
#         if cellvalue != \"WFC_Initialize\" and cellvalue in UserConstraintAttributes:
#             AllowedIndices = [x for x in range(len(Patterns)) if cellvalue == Patterns[x][0]]
#             # print len(AllowedIndices)
#
#             PickedIndex = np.random.choice([PatternIndex for PatternIndex in AllowedIndices])
#             assign_pattern_to_cell(cellindex, PickedIndex)
#             Running, Error = propagate_grid_cells(cellindex)
#             if Error == 1:
#                 return False
#
#     return True

Classes

class WaveFunctionCollapse (image, patter_size=3, periodic_input=True, rotate_patterns=True)
Expand source code
class WaveFunctionCollapse:
    # wave = WaveFunctionCollapse(*params)  this step will read input image and create patterns
    # new_image = wave.solve(*params)  this step will generate output image

    def __init__(
            self,
            image,
            patter_size=3,
            periodic_input=True,
            rotate_patterns=True):

        self.input_grid_size = image.shape
        self.input_sample_image = image.reshape(-1, 4).tolist()
        self.periodic_input = periodic_input
        self.add_rotations = rotate_patterns
        self.pattern_size = patter_size

        self.output_grid_size = None
        self.seed = None
        self.tile_around_bounds = None

        self.patterns = []
        self.patterns_transforms = []
        self.pattern_frequencies = []
        self.number_of_unique_patterns = None
        self.output_grid = {}
        self.entropy_grid = {}
        self.solve_starting_point_index = None
        self.allowed_pattern_adjacencies = {}
        self.nbr_directions = ((-1, 0), (1, 0), (0, -1), (0, 1))
        self.respect_user_constraints = False
        self.use_input_pattern_frequency = 1

        self.create_patterns_from_input()

    def solve(self,
              output_size=(10, 10),
              seed=0,
              tiling_output=False,
              max_number_contradiction_tries=1):

        self.output_grid_size = output_size
        self.tile_around_bounds = tiling_output

        for solve_attempt in range(max_number_contradiction_tries):

            # Set the seed for our random picking of values
            np.random.seed(seed + solve_attempt * 100)

            # Initialize WFC process
            self.initialize_grid()
            self.initialize_entropy_grid()
            self.calculate_adjacencies()

            if self.respect_user_constraints:
                success = ForceUserConstraints()  # is this need?
            else:
                success = True

            if success:
                # Run the WFC solve itself
                success = self.run_wfc_solve()

            # Assign the output values
            if success:
                return self.assign_wave_to_output()

        # If we have exceeded the number of retries for the solve,
        # we will throw an error to tell the user no solution has been found
        raise RuntimeError("Looks like solution for current input parameters can't be found. "
                           "Try to change seed or number of contradiction tries")

    def create_patterns_from_input(self):
        # Here we will take the input grid, extract its values (name attribute), and cut it up into NxN size patterns
        # Kernel to store data in
        search_kernel = tuple(
            tuple(i + n * self.input_grid_size[0] for i in range(self.pattern_size)) for n in range(self.pattern_size))

        all_temp_patterns = []
        all_temp_patterns_transforms = []

        if self.periodic_input:
            offset = 0
        else:
            offset = (self.pattern_size - 1)

        # Loop over grid
        for y in range(self.input_grid_size[1] - offset):
            for x in range(self.input_grid_size[0] - offset):

                matrix_as_list = []
                for item in search_kernel:
                    tmp = []
                    for sub_items in item:
                        list_index = int(((x + sub_items) % self.input_grid_size[0]) + (
                                        ((item[0] + self.input_grid_size[0] * y) / self.input_grid_size[0])
                                        % self.input_grid_size[1]) * self.input_grid_size[0])
                        tmp.append(self.input_sample_image[list_index])

                    # This is where variations would take place

                    matrix_as_list.append(tuple(tmp))

                temp_pattern = tuple(matrix_as_list)

                if not self.add_rotations:
                    all_temp_patterns.append(temp_pattern)
                    all_temp_patterns_transforms.append([0, 1, 1])
                else:
                    for x in range(4):
                        temp_pattern = list(zip(*temp_pattern[::-1]))
                        all_temp_patterns.append(temp_pattern)
                        all_temp_patterns_transforms.append([(x + 1) * 90, 1, 1])
                        # Maybe add horizontal flip too??
                        # all_temp_patterns.append([a[::-1] for a in temp_pattern]) # Flip X
                        # all_temp_patterns_transforms.append([(x+1)*90, -1, 1])

        all_temp_patterns = [tuple(chain.from_iterable(p)) for p in all_temp_patterns]

        self.patterns = []
        self.patterns_transforms = []
        self.pattern_frequencies = []

        for i, pattern in enumerate(all_temp_patterns):
            if pattern not in self.patterns:
                self.patterns.append(pattern)
                self.pattern_frequencies.append(1)
                self.patterns_transforms.append(all_temp_patterns_transforms[i])
            else:
                index = self.patterns.index(pattern)
                self.pattern_frequencies[index] += 1

        self.number_of_unique_patterns = len(self.pattern_frequencies)

    def initialize_grid(self):
        # Here we create a list that will be used as our output grid. (Used for solving in)

        for x in range(self.output_grid_size[0] * self.output_grid_size[1]):
            self.output_grid[x] = set(range(self.number_of_unique_patterns))

    def initialize_entropy_grid(self):
        # Here we create grid that matches the output grid, but we store entropy values instead.
        # (Entropy = Number of remaining legal patterns)

        for x in range(self.output_grid_size[0] * self.output_grid_size[1]):
            self.entropy_grid[x] = self.number_of_unique_patterns

        # Pick starting point for solve. (Random if not specified)
        if self.solve_starting_point_index is None:
            # solve_starting_point_index = np.random.randint(NumberOfUniquePatterns)
            self.solve_starting_point_index = np.random.randint(len(self.entropy_grid.keys()))

        self.entropy_grid[self.solve_starting_point_index] = self.number_of_unique_patterns - 1

    def calculate_adjacencies(self):
        # If PatternIndex = 10 has been observed to be to the left of of PatternIndex = 15 in the InputGrid:
        # AllowedPatternAdjacencies[PatternIndex=15][0].add(PatternIndex=10)
        # Directions: 0 = left, 1 = right, 2 = up, 3 = down
        # OUTPUT EXAMPLE PatternIndex = 15 --> (LEFT: set([65, 36, 69, 44, 87, 56, 29]),
        # RIGHT: set([8]), UP: set([32, 64, 10, 12, 14, 83, 21]), DOWN: set([15]))

        # Initialize empty AllowedPatternAdjacencies
        for x in range(self.number_of_unique_patterns):
            self.allowed_pattern_adjacencies[x] = tuple(set() for _ in range(len(self.nbr_directions)))

        # Comparing patterns to each other
        for PatternIndex1 in range(self.number_of_unique_patterns):
            for PatternIndex2 in range(self.number_of_unique_patterns):

                pattern1_boundary_columns = [n for i, n in enumerate(self.patterns[PatternIndex1]) if
                                             i % self.pattern_size != (self.pattern_size - 1)]
                pattern2_boundary_columns = [n for i, n in enumerate(self.patterns[PatternIndex2]) if
                                             i % self.pattern_size != 0]

                # Compare Columns compatibility
                if pattern1_boundary_columns == pattern2_boundary_columns:
                    self.allowed_pattern_adjacencies[PatternIndex1][0].add(PatternIndex2)
                    self.allowed_pattern_adjacencies[PatternIndex2][1].add(PatternIndex1)

                pattern1_boundary_rows = self.patterns[PatternIndex1][
                                       :(self.pattern_size * self.pattern_size) - self.pattern_size]
                pattern2_boundary_rows = self.patterns[PatternIndex2][self.pattern_size:]

                if pattern1_boundary_rows == pattern2_boundary_rows:
                    self.allowed_pattern_adjacencies[PatternIndex1][2].add(PatternIndex2)
                    self.allowed_pattern_adjacencies[PatternIndex2][3].add(PatternIndex1)

    def run_wfc_solve(self):
        # This runs the actual WFC solve
        running = True
        while running:

            # Find the cell with the lowest entropy value, and assign a random valid PatternIndex
            lowest_entropy_cell = self.get_lowest_entropy_cell()
            pattern_index_for_cell = self.get_random_allowed_pattern_index_from_cell(lowest_entropy_cell)
            self.assign_pattern_to_cell(lowest_entropy_cell, pattern_index_for_cell)

            # Propagate the OutputGrid after collapsing the LowestEntropyCell
            running, error = self.propagate_grid_cells(lowest_entropy_cell)

            # Check if all cells in the OutputGrid have collapsed yet (done solving)
            if len(self.entropy_grid.keys()) == 0:
                running = False

        # Check if we have ran into an error. (contradiction while propagating)
        if error == 1:
            return False
        else:
            return True

    def get_lowest_entropy_cell(self):
        # Find the list entry in the entropy grid with the lowest value
        return min(self.entropy_grid, key=self.entropy_grid.get)

    def get_random_allowed_pattern_index_from_cell(self, cell):
        # Assign a random allowed pattern_index to given cell.
        # This can either use frequency of found patterns as a weighted random or not depending on user parm
        if self.use_input_pattern_frequency == 1:
            return np.random.choice(
                [pattern_index for pattern_index in self.output_grid[cell]
                 for i in range(self.pattern_frequencies[pattern_index])])
        else:
            return np.random.choice([pattern_index for pattern_index in self.output_grid[cell]])

    def assign_pattern_to_cell(self, cell, pattern_index):
        # Assign given cell a chosen PatternIndex, and delete cell from EntropyGrid (Cell has collapsed)
        self.output_grid[cell] = {pattern_index}
        del self.entropy_grid[cell]

    def propagate_grid_cells(self, cell):
        # This propagates all the cells that should have been affected from the just-collapsed cell

        # We are using a stack to add newly found to-be-updated cells to
        to_update_stack = {cell}
        while len(to_update_stack) != 0:
            cell_index = to_update_stack.pop()

            # loop through neighbor cells of currently propagated cell
            for direction, transform in enumerate(self.nbr_directions):

                neighbor_index_is_valid = True

                x = int((cell_index % self.output_grid_size[0] + transform[0]) % self.output_grid_size[0])
                y = int((cell_index / self.output_grid_size[0] + transform[1]) % self.output_grid_size[1])
                neighbor_cell_index = x + y * self.output_grid_size[0]  # index of neighboring cell

                # If the user does not want the WFC solve to create a tiling output,
                # we just state that the found neighbor cell is invalid and don't propagate it
                if not self.tile_around_bounds:
                    x_is_wrapping = abs(neighbor_cell_index % self.output_grid_size[0]
                                        - cell_index % self.output_grid_size[0]) > 1
                    y_is_wrapping = abs(neighbor_cell_index / self.output_grid_size[1]
                                        - cell_index / self.output_grid_size[1]) > 1

                    if x_is_wrapping or y_is_wrapping:
                        neighbor_index_is_valid = False

                # Cell has not yet been collapsed yet
                if neighbor_cell_index in self.entropy_grid and neighbor_index_is_valid:

                    # These are all the allowed patterns for the direction of the checked neighbor cell
                    pattern_indices_in_cell = {n for pattern_index in self.output_grid[cell_index] for n in
                                               self.allowed_pattern_adjacencies[pattern_index][direction]}

                    # These are all the patterns the neighbor allows itself
                    pattern_indices_in_neighbor_cell = self.output_grid[neighbor_cell_index]

                    # Make sure we need to update the cell
                    # by checking if the currently pattern_indices_in_cell patterns for the neighbor cells
                    # are already fully contained in the now reduced set of patterns
                    if not pattern_indices_in_neighbor_cell.issubset(pattern_indices_in_cell):

                        shared_cell_and_neighbor_pattern_indices = set(
                            [x for x in pattern_indices_in_cell if x in pattern_indices_in_neighbor_cell])

                        if len(shared_cell_and_neighbor_pattern_indices) == 0:
                            return False, 1

                        self.output_grid[neighbor_cell_index] = shared_cell_and_neighbor_pattern_indices
                        self.entropy_grid[neighbor_cell_index] = len(self.output_grid[neighbor_cell_index])
                        to_update_stack.add(neighbor_cell_index)

        return True, 0

    def assign_wave_to_output(self):
        # This finds and assigns the picked PatternIndex to the output grid as attributes
        flat_out_image = []
        for x in self.output_grid.keys():
            val = next(iter(self.output_grid[x]))
            flat_out_image.append(self.patterns[val][0])

        out_image = []
        for i_row in range(self.output_grid_size[1]):
            current_row_position = i_row * self.output_grid_size[0]
            pixel_row = flat_out_image[current_row_position: current_row_position + self.output_grid_size[0]]
            out_image.append(pixel_row)

        return out_image


# def ForceUserConstraints():
    # This function handles assigning user constraints based on attached attribute values to the output grid

Methods

def assign_pattern_to_cell(self, cell, pattern_index)
Expand source code
def assign_pattern_to_cell(self, cell, pattern_index):
    # Assign given cell a chosen PatternIndex, and delete cell from EntropyGrid (Cell has collapsed)
    self.output_grid[cell] = {pattern_index}
    del self.entropy_grid[cell]
def assign_wave_to_output(self)
Expand source code
def assign_wave_to_output(self):
    # This finds and assigns the picked PatternIndex to the output grid as attributes
    flat_out_image = []
    for x in self.output_grid.keys():
        val = next(iter(self.output_grid[x]))
        flat_out_image.append(self.patterns[val][0])

    out_image = []
    for i_row in range(self.output_grid_size[1]):
        current_row_position = i_row * self.output_grid_size[0]
        pixel_row = flat_out_image[current_row_position: current_row_position + self.output_grid_size[0]]
        out_image.append(pixel_row)

    return out_image
def calculate_adjacencies(self)
Expand source code
def calculate_adjacencies(self):
    # If PatternIndex = 10 has been observed to be to the left of of PatternIndex = 15 in the InputGrid:
    # AllowedPatternAdjacencies[PatternIndex=15][0].add(PatternIndex=10)
    # Directions: 0 = left, 1 = right, 2 = up, 3 = down
    # OUTPUT EXAMPLE PatternIndex = 15 --> (LEFT: set([65, 36, 69, 44, 87, 56, 29]),
    # RIGHT: set([8]), UP: set([32, 64, 10, 12, 14, 83, 21]), DOWN: set([15]))

    # Initialize empty AllowedPatternAdjacencies
    for x in range(self.number_of_unique_patterns):
        self.allowed_pattern_adjacencies[x] = tuple(set() for _ in range(len(self.nbr_directions)))

    # Comparing patterns to each other
    for PatternIndex1 in range(self.number_of_unique_patterns):
        for PatternIndex2 in range(self.number_of_unique_patterns):

            pattern1_boundary_columns = [n for i, n in enumerate(self.patterns[PatternIndex1]) if
                                         i % self.pattern_size != (self.pattern_size - 1)]
            pattern2_boundary_columns = [n for i, n in enumerate(self.patterns[PatternIndex2]) if
                                         i % self.pattern_size != 0]

            # Compare Columns compatibility
            if pattern1_boundary_columns == pattern2_boundary_columns:
                self.allowed_pattern_adjacencies[PatternIndex1][0].add(PatternIndex2)
                self.allowed_pattern_adjacencies[PatternIndex2][1].add(PatternIndex1)

            pattern1_boundary_rows = self.patterns[PatternIndex1][
                                   :(self.pattern_size * self.pattern_size) - self.pattern_size]
            pattern2_boundary_rows = self.patterns[PatternIndex2][self.pattern_size:]

            if pattern1_boundary_rows == pattern2_boundary_rows:
                self.allowed_pattern_adjacencies[PatternIndex1][2].add(PatternIndex2)
                self.allowed_pattern_adjacencies[PatternIndex2][3].add(PatternIndex1)
def create_patterns_from_input(self)
Expand source code
def create_patterns_from_input(self):
    # Here we will take the input grid, extract its values (name attribute), and cut it up into NxN size patterns
    # Kernel to store data in
    search_kernel = tuple(
        tuple(i + n * self.input_grid_size[0] for i in range(self.pattern_size)) for n in range(self.pattern_size))

    all_temp_patterns = []
    all_temp_patterns_transforms = []

    if self.periodic_input:
        offset = 0
    else:
        offset = (self.pattern_size - 1)

    # Loop over grid
    for y in range(self.input_grid_size[1] - offset):
        for x in range(self.input_grid_size[0] - offset):

            matrix_as_list = []
            for item in search_kernel:
                tmp = []
                for sub_items in item:
                    list_index = int(((x + sub_items) % self.input_grid_size[0]) + (
                                    ((item[0] + self.input_grid_size[0] * y) / self.input_grid_size[0])
                                    % self.input_grid_size[1]) * self.input_grid_size[0])
                    tmp.append(self.input_sample_image[list_index])

                # This is where variations would take place

                matrix_as_list.append(tuple(tmp))

            temp_pattern = tuple(matrix_as_list)

            if not self.add_rotations:
                all_temp_patterns.append(temp_pattern)
                all_temp_patterns_transforms.append([0, 1, 1])
            else:
                for x in range(4):
                    temp_pattern = list(zip(*temp_pattern[::-1]))
                    all_temp_patterns.append(temp_pattern)
                    all_temp_patterns_transforms.append([(x + 1) * 90, 1, 1])
                    # Maybe add horizontal flip too??
                    # all_temp_patterns.append([a[::-1] for a in temp_pattern]) # Flip X
                    # all_temp_patterns_transforms.append([(x+1)*90, -1, 1])

    all_temp_patterns = [tuple(chain.from_iterable(p)) for p in all_temp_patterns]

    self.patterns = []
    self.patterns_transforms = []
    self.pattern_frequencies = []

    for i, pattern in enumerate(all_temp_patterns):
        if pattern not in self.patterns:
            self.patterns.append(pattern)
            self.pattern_frequencies.append(1)
            self.patterns_transforms.append(all_temp_patterns_transforms[i])
        else:
            index = self.patterns.index(pattern)
            self.pattern_frequencies[index] += 1

    self.number_of_unique_patterns = len(self.pattern_frequencies)
def get_lowest_entropy_cell(self)
Expand source code
def get_lowest_entropy_cell(self):
    # Find the list entry in the entropy grid with the lowest value
    return min(self.entropy_grid, key=self.entropy_grid.get)
def get_random_allowed_pattern_index_from_cell(self, cell)
Expand source code
def get_random_allowed_pattern_index_from_cell(self, cell):
    # Assign a random allowed pattern_index to given cell.
    # This can either use frequency of found patterns as a weighted random or not depending on user parm
    if self.use_input_pattern_frequency == 1:
        return np.random.choice(
            [pattern_index for pattern_index in self.output_grid[cell]
             for i in range(self.pattern_frequencies[pattern_index])])
    else:
        return np.random.choice([pattern_index for pattern_index in self.output_grid[cell]])
def initialize_entropy_grid(self)
Expand source code
def initialize_entropy_grid(self):
    # Here we create grid that matches the output grid, but we store entropy values instead.
    # (Entropy = Number of remaining legal patterns)

    for x in range(self.output_grid_size[0] * self.output_grid_size[1]):
        self.entropy_grid[x] = self.number_of_unique_patterns

    # Pick starting point for solve. (Random if not specified)
    if self.solve_starting_point_index is None:
        # solve_starting_point_index = np.random.randint(NumberOfUniquePatterns)
        self.solve_starting_point_index = np.random.randint(len(self.entropy_grid.keys()))

    self.entropy_grid[self.solve_starting_point_index] = self.number_of_unique_patterns - 1
def initialize_grid(self)
Expand source code
def initialize_grid(self):
    # Here we create a list that will be used as our output grid. (Used for solving in)

    for x in range(self.output_grid_size[0] * self.output_grid_size[1]):
        self.output_grid[x] = set(range(self.number_of_unique_patterns))
def propagate_grid_cells(self, cell)
Expand source code
def propagate_grid_cells(self, cell):
    # This propagates all the cells that should have been affected from the just-collapsed cell

    # We are using a stack to add newly found to-be-updated cells to
    to_update_stack = {cell}
    while len(to_update_stack) != 0:
        cell_index = to_update_stack.pop()

        # loop through neighbor cells of currently propagated cell
        for direction, transform in enumerate(self.nbr_directions):

            neighbor_index_is_valid = True

            x = int((cell_index % self.output_grid_size[0] + transform[0]) % self.output_grid_size[0])
            y = int((cell_index / self.output_grid_size[0] + transform[1]) % self.output_grid_size[1])
            neighbor_cell_index = x + y * self.output_grid_size[0]  # index of neighboring cell

            # If the user does not want the WFC solve to create a tiling output,
            # we just state that the found neighbor cell is invalid and don't propagate it
            if not self.tile_around_bounds:
                x_is_wrapping = abs(neighbor_cell_index % self.output_grid_size[0]
                                    - cell_index % self.output_grid_size[0]) > 1
                y_is_wrapping = abs(neighbor_cell_index / self.output_grid_size[1]
                                    - cell_index / self.output_grid_size[1]) > 1

                if x_is_wrapping or y_is_wrapping:
                    neighbor_index_is_valid = False

            # Cell has not yet been collapsed yet
            if neighbor_cell_index in self.entropy_grid and neighbor_index_is_valid:

                # These are all the allowed patterns for the direction of the checked neighbor cell
                pattern_indices_in_cell = {n for pattern_index in self.output_grid[cell_index] for n in
                                           self.allowed_pattern_adjacencies[pattern_index][direction]}

                # These are all the patterns the neighbor allows itself
                pattern_indices_in_neighbor_cell = self.output_grid[neighbor_cell_index]

                # Make sure we need to update the cell
                # by checking if the currently pattern_indices_in_cell patterns for the neighbor cells
                # are already fully contained in the now reduced set of patterns
                if not pattern_indices_in_neighbor_cell.issubset(pattern_indices_in_cell):

                    shared_cell_and_neighbor_pattern_indices = set(
                        [x for x in pattern_indices_in_cell if x in pattern_indices_in_neighbor_cell])

                    if len(shared_cell_and_neighbor_pattern_indices) == 0:
                        return False, 1

                    self.output_grid[neighbor_cell_index] = shared_cell_and_neighbor_pattern_indices
                    self.entropy_grid[neighbor_cell_index] = len(self.output_grid[neighbor_cell_index])
                    to_update_stack.add(neighbor_cell_index)

    return True, 0
def run_wfc_solve(self)
Expand source code
def run_wfc_solve(self):
    # This runs the actual WFC solve
    running = True
    while running:

        # Find the cell with the lowest entropy value, and assign a random valid PatternIndex
        lowest_entropy_cell = self.get_lowest_entropy_cell()
        pattern_index_for_cell = self.get_random_allowed_pattern_index_from_cell(lowest_entropy_cell)
        self.assign_pattern_to_cell(lowest_entropy_cell, pattern_index_for_cell)

        # Propagate the OutputGrid after collapsing the LowestEntropyCell
        running, error = self.propagate_grid_cells(lowest_entropy_cell)

        # Check if all cells in the OutputGrid have collapsed yet (done solving)
        if len(self.entropy_grid.keys()) == 0:
            running = False

    # Check if we have ran into an error. (contradiction while propagating)
    if error == 1:
        return False
    else:
        return True
def solve(self, output_size=(10, 10), seed=0, tiling_output=False, max_number_contradiction_tries=1)
Expand source code
def solve(self,
          output_size=(10, 10),
          seed=0,
          tiling_output=False,
          max_number_contradiction_tries=1):

    self.output_grid_size = output_size
    self.tile_around_bounds = tiling_output

    for solve_attempt in range(max_number_contradiction_tries):

        # Set the seed for our random picking of values
        np.random.seed(seed + solve_attempt * 100)

        # Initialize WFC process
        self.initialize_grid()
        self.initialize_entropy_grid()
        self.calculate_adjacencies()

        if self.respect_user_constraints:
            success = ForceUserConstraints()  # is this need?
        else:
            success = True

        if success:
            # Run the WFC solve itself
            success = self.run_wfc_solve()

        # Assign the output values
        if success:
            return self.assign_wave_to_output()

    # If we have exceeded the number of retries for the solve,
    # we will throw an error to tell the user no solution has been found
    raise RuntimeError("Looks like solution for current input parameters can't be found. "
                       "Try to change seed or number of contradiction tries")