Map Reduce

Input Class 선언

class InputData(object):
    def read(self):
        raise NotImplementedError

    @classmethod
    def generate_inputs(cls, file_path: str):
        raise NotImplementedError


class FileInputData(InputData):
    def __init__(self, file_path):
        if not file_path:
            raise ValueError(f"path error : {file_path}")
        super().__init__()

        if not os.path.isfile(file_path):
            raise ValueError(f"file not exists {file_path}")

        self.path = file_path

    def read(self):
        return open(self.path).read()

    @classmethod
    def generate_inputs(cls, path: str):
        for name in os.listdir(path):
            yield cls(os.path.join(path, name))

Worker Class 선언

class Worker(object):
    def __init__(self, input_data):
        self.input_data = input_data
        self.result = None

    def map(self):
        raise NotImplementedError

    def reduce(self, other):
        raise NotImplementedError

    @classmethod
    def create_workers(cls, input_class, path):
        workers = []
        for input_data in input_class.generate_inputs(path):
            workers.append(cls(input_data))
        return workers


class LineCountWorker(Worker):
    def map(self):
        data = self.input_data.read()
        self.result = data.count("\n")

    def reduce(self, other):
        self.result += other.result

Map Reduce 수행

def execute(workers):
    threads = [Thread(target=w.map) for w in workers]
    for thread in threads:
        thread.start()

    for thread in threads:
        thread.join()

    first, rest = workers[0], workers[1:]
    for worker in rest:
        first.reduce(worker)

    return first.result

def map_reduce(worker_class, input_class, path):
    workers = worker_class.create_workers(input_class, path)
    return execute(workers)