Academic Integrity: tutoring, explanations, and feedback — we don’t complete graded work or submit on a student’s behalf.

Python 3.x completing HuffmanHeap: This is what needs to be completed HuffmanHea

ID: 3919893 • Letter: P

Question

Python 3.x completing HuffmanHeap:

This is what needs to be completed HuffmanHeap.py:

Encoder.py

Question 1 (17 points): Purpose: To implement an ADT to support efficient execution of a HuffmanTree encoder Degree of Difficulty: Easy to Moderate Phoenix Wright, ace attorney, loves being lazy - erm, efficient. Phoenix is also short on money. He has heard of text compression, and thinks it might be a good idea to compress all his text files to save on valuable disk space (disk space is expensive for him!). Larry Butz, his useless IT friend, wrote most of the code for text compression using Huffman trees, but did not finish the job (surprising no one). On the Assignment 10 Moodle page, you'll find a program named Encoder.py. The program follows a simple design, and is well-documented. All the pieces should be familiar to you, as they are similar to discussion in class. The Encoder.py program imports two supporting ADTs » HuffmanTree.py Defines the Huf fmanTree class . HuffmanHeap.py Defines the HuffmanHeap class. The HuffmanTree class was discussed in lecture, and its definition should be familiar. There is one change that you should note: the function build_codec that builds the codec from a HuffmanTree object is novw a method in the HuffmanTree class The ADT HuffmanHeap, was also discussed in lecture, though the code was not given. A heap is a kind of queue that guarantees that the dequeue operation always removes and returns the smallest value in the queue (this is not the FIFO protoco). The HuffmanHeap stores HuffmanTree objects, and has the following methods . __init_.(self, leafs): Creates a new HuffmanHeap object. . enqueue(self, tree): Adds the given HuffmanTree to the heap . dequeue(self): Removes and returns the smallest HuffmanTree in the heap ·-len--(self): Returns the number of HuffmanTrees in the heap. Defining this method allows pro- grammers to call len ) on HuffmanHeap objects In lecture we saw that we could implement these operations so that they all have O(n) worst case time complexity, where n is the number of HuffmanTree objects in the HuffmanHeap. However, we also de- scribed in lecture how these operations could be implemented so that they all have worst case time com lexity of O(1). Hint: use 2 lists The HuffmanHeap class is incomplete. Your job is to complete the 4 methods described above, so that the Encoder rogram correctly encodes text files (When you've completed Questions 1 and 2, you should be able to encode and decode any text files. f you want to test your encoder, you can try decoding with the program given for Question 2.) NOTE: some symbols do not work for our encoding solution. apostrophes, semi-colons and non-standard symbols may have errors

Explanation / Answer

import json
from collections import Counter, namedtuple
from multiprocessing import Pool, cpu_count
from os.path import getsize
from time import clock

MULTIPROCESSING_THRESHOLD = 200000


class HuffmanHeap(list):
   Element = namedtuple('Element', ('weight', 'encodings'))

   def push(self, weight, encodings):
       element = HuffmanHeap.Element(weight, encodings)

       if not self or weight >= self[-1].weight:
           self.append(element)
           return

       for i in range(len(self)):
           if weight < self[i].weight:
               self.insert(i - 1, element)
               return


def create_encoding_table(bytes_):
   heap = HuffmanHeap()

   for byte, count in Counter(bytes_).items():
       heap.push(count, {byte: ''})

   while len(heap) - 1:
       lo, hi = heap.pop(0), heap.pop(0)

       for byte in lo.encodings:
           lo.encodings[byte] = '0' + lo.encodings[byte]
       for byte in hi.encodings:
           hi.encodings[byte] = '1' + hi.encodings[byte]

       heap.push(lo.weight + hi.weight, {**lo.encodings, **hi.encodings})

   return heap[0].encodings


def segment(array, parts):
   avg = len(array) / parts
   last = 0.0

   while last < len(array):
       yield array[int(last):int(last + avg)]
       last += avg


def encoding_worker(data, table):
   return ''.join(table[byte] for byte in data)


def encode(data, table, pool):
   tasks = [pool.apply_async(encoding_worker, args=(part, table)) for part in segment(data, cpu_count())]
   del data, table
   return ''.join(task.get() for task in tasks)


def segment_bytewise(string, parts):
   avg = len(string) / parts
   avg = avg + (8 - avg % 8)
   last = 0

   while last < len(string):
       yield string[int(last):int(last + avg)]
       last += avg


def pack_bytes_worker(string):
   return bytearray(int(string[i:i + 8], 2) for i in range(0, len(string), 8))


def pack_bytes(string, pool):
   tasks = [pool.apply_async(pack_bytes_worker, args=(part,))
             for part in segment_bytewise(string + '0' * (8 - len(string) % 8), cpu_count())]
   del string
   return bytearray(b''.join(task.get() for task in tasks))


def encode_file(filename, table_filename, compressed_filename, encoding='UTF-8', silent=False):
   start_time = clock()

   silent or print('Reading... ', end='')
   with open(filename, 'rb') as file:
       uncompressed_bytes = file.read()
   silent or print('Done')

   if not uncompressed_bytes or len(uncompressed_bytes) == 1:
       raise ValueError("Not enough data to compress")

   silent or print('Creating tables... ', end='')
   encoding_table = create_encoding_table(uncompressed_bytes)
   decoding_table = {value: key for key, value in encoding_table.items()}
   decoding_table['size'] = len(uncompressed_bytes)
   silent or print('Done')

   silent or print('Writing table... ', end='')
   with open(table_filename, 'w+', encoding=encoding) as file:
       json.dump(decoding_table, file)
   silent or print('Done')

   del decoding_table

   pool = Pool(cpu_count())

   silent or print('Compressing... ', end='')
   if len(uncompressed_bytes) < MULTIPROCESSING_THRESHOLD:
       compressed_string = encoding_worker(uncompressed_bytes, encoding_table)
   else:
       silent or print('using multiprocessing... ', end='')
       compressed_string = encode(uncompressed_bytes, encoding_table, pool)
   silent or print('Done')

   del uncompressed_bytes, encoding_table

   silent or print('Preparing data using multiprocessing... ', end='')
   compressed_bytes = pack_bytes(compressed_string, pool)
   silent or print('Done')

   del compressed_string
   pool.close()
   pool.terminate()

   silent or print('Writing... ', end='')
   with open(compressed_filename, 'wb+') as file:
       file.write(compressed_bytes)
   silent or print('Done')
   silent or print('Encoded in {:.4f} seconds, {:.2%} Space savings'.format(
       clock() - start_time,
       1 - (getsize(compressed_filename) + getsize(table_filename)) / getsize(filename))
   )


def unpack_bytes_worker(bytes_):
   return ''.join('{0:08b}'.format(byte) for byte in bytes_)


def unpack_bytes(bytes_, pool):
   tasks = [pool.apply_async(unpack_bytes_worker, args=(part,)) for part in segment(bytes_, cpu_count())]
   del bytes_
   return ''.join(task.get() for task in tasks)


def decode(string, table, size):
   uncompressed_bytes = b''
   string = iter(string)

   buffer = ''
   while len(uncompressed_bytes) < size:
       buffer += next(string)
       if buffer in table:
           uncompressed_bytes += bytes([table[buffer]])
           buffer = ''

   return uncompressed_bytes


def decode_file(filename, table_filename, uncompressed_filename, encoding='UTF-8', silent=False):
   start_time = clock()

   silent or print('Reading file...', end='')
   with open(filename, 'rb') as file:
       compressed_bytes = file.read()
   silent or print('Done')

   pool = Pool(cpu_count())

   silent or print('Preparing data using multiprocessing... ', end='')
   compressed_string = unpack_bytes(compressed_bytes, pool)
   silent or print('Done')

   pool.close()
   pool.terminate()

   silent or print('Reading table... ', end='')
   with open(table_filename, encoding=encoding) as file:
       decoding_table = json.load(file)

   size = decoding_table.pop('size')
   silent or print('Done')

   silent or print('Uncompressing...', end='')
   uncompressed_bytes = decode(compressed_string, decoding_table, size)
   silent or print('Done')

   silent or print('Writing... ', end='')
   with open(uncompressed_filename, 'wb+') as file:
       file.write(uncompressed_bytes)
   silent or print('Done')
   silent or print('Decoded in {:.4f} seconds'.format(clock() - start_time))


def main():
   in_name = 'in.bmp'
   table_name = 'table.json'
   compressed_name = 'compressed.bin'
   out_name = 'out.bmp'

   encode_file(in_name, table_name, compressed_name)
   decode_file(compressed_name, table_name, out_name)

   with open(in_name, 'rb') as in_, open(out_name, 'rb') as out:
       assert in_.read() == out.read()


if __name__ == '__main__':
   main()

Encoder.py

import heapq

class Node:
    def __init__(self, prob, symbol = None):
        """Create node for given symbol and probability."""
        self.left = None
        self.right = None
        self.symbol = symbol
        self.prob = prob

    # Need comparator method at a minimum to work with heapq
    def __lt__(self, other):
        return self.prob < other.prob
  
    def encode(self, encoding):
        """Return bit encoding in traversal."""
        if self.left is None and self.right is None:
            yield (self.symbol, encoding)
        else:
            for v in self.left.encode(encoding + '0'):
                yield v
            for v in self.right.encode(encoding + '1'):
                yield v

class Huffman:
    def __init__(self, initial):
        """Construct encoding given initial corpus."""
        self.initial = initial
      
        # Count frequencies
        freq = {}
        for _ in initial:
            if _ in freq:
                freq[_] += 1
            else:
                freq[_] = 1

        # Construct priority queue
        pq = []
        for symbol in freq:
            pq.append(Node(freq[symbol], symbol))
        heapq.heapify(pq)

        # special case: what if only one symbol?
        if len(pq) == 1:
            self.root = Node(1)
            self.root.left = pq[0]
            self.encoding = {symbol: '0'}
            return

        # Huffman Encoding Algorithm
        while len(pq) > 1:
            n1 = heapq.heappop(pq)
            n2 = heapq.heappop(pq)
            n3 = Node(n1.prob + n2.prob)
            n3.left = n1
            n3.right = n2
            heapq.heappush(pq, n3)

        # Record
        self.root = pq[0]
        self.encoding = {}
        for sym,code in pq[0].encode(''):
            self.encoding[sym]=code

    def __repr__(self):
        """Show encoding"""
        return 'huffman:' + str(self.encoding)

    def encode(self, s):
        """Return bit string for encoding."""
        bits = ''
        for _ in s:
            if not _ in self.encoding:
                raise ValueError("'" + _ + "' is not encoded character")
            bits += self.encoding[_]
        return bits

    def decode(self, bits):
        """Decode ASCII bit string for simplicity."""
        node = self.root
        s = ''
        for _ in bits:
            if _ == '0':
                node = node.left
            else:
                node = node.right

            if node.symbol:
                s += node.symbol
                node = self.root

        return s