Commit dcf9c967 by Sanjay Krishnan

updated with class files

parent 59c34038
This source diff could not be displayed because it is too large. You can view the blob instead.
class Load:
"""Loads a large file into the
program line-by-line"""
def __init__(self, filename):
self.filename = filename
def __iter__(self):
self.file = open(self.filename, 'r')
self.line = self.file.readline()
return self
def __next__(self):
if self.line != "":
result = self.line
self.line = self.file.readline()
return int(result)
else:
raise StopIteration
def Flush(it, filename):
file = open(filename, 'w')
for i in it:
file.write(str(i)+'\n')
file.close()
def Size(filename):
import os
return os.path.getsize(filename)
\ No newline at end of file
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Out-of-core algorithms\n",
"External memory algorithms or out-of-core algorithms are algorithms that are designed to process data that is too large to fit into a computer's main memory at one time.\n",
"\n",
"## Motivation\n",
"One strategy for memory management is *virtualization*. Memory virtualization is when an application believes it has more memory than it actually does, and a secondary process fetches and replaces memory seemlessly.\n",
"Computer operating systems provide each program with a virtual memory address space that is effectively infinite. The program can use this address space as if it were main memory.\n",
"\n",
"Behind the scenes, the underlying memory management system implements *paging*, a scheme by which a computer stores and retrieves data from secondary storage for use in main memory.\n",
"In this scheme, the operating system retrieves data from secondary storage in same-size blocks called pages. Paging is an important part of virtual memory implementations in modern operating systems, using secondary storage to let programs exceed the size of available physical memory.\n",
"\n",
"The paging strategies used in operating systems are agnostic to the program that the user writes. These are often not efficient enough for data-intensive programs. Efficiency means minimizing excessive use of the secondary storage.\n",
"In a data-engineering setting, we are often working with scales that are large enough that small overheads matter. Fetching a 4KB block of data from a modern SSD hard-drive takes roughly 1 ms. 1TB of data contains nearly 200M such blocks. If even 1% of fetch requests are spurious or at an inopportune time, we waste almost 45 minutes computation. \n",
"\n",
"Careful, program-specific memory management is studied in the field of out-of-core algorithms.\n",
"Out-of-core refers to a set of algorithms working with data that cannot fit into the memory of a single computer, but that can easily fit into some data storage such as a local hard disk or web repository.\n",
"In this model, we explicitly give the algorithm the control to page in and page out data. The developer can choose how and when to do it for each block of data.\n",
"\n",
"## Merge-Sort\n",
"A classical merge sort works as follows:\n",
"* Recursively, sub-divide the unsorted list into n sublists, each containing one element (a list of one element is considered sorted).\n",
"* Repeatedly merge sublists to produce new sorted sublists until there is only one sublist remaining. This will be the sorted list.\n",
"\n",
"This algorithm works when there is enough memory to store the entire dataset--you can store all of the sorted sublists (called runs) in memory. We want to extend this to an out-of-core model. To do so, let's think about a generalization of merge-sort. Merge-sort is an example of a divide-and-conquer algorithm. It recursively partitions the data into the smallest unit that we can trivially solve (e.g., sorting a singleton list) and then algorithmically combines the results from the solved sub-units. \n",
"\n",
"Let's assume we have access to a subroutine `sortk()` which is a blackbox that can sort at most k elements. The same general algorithm would apply. Instead of partitioning down to the singleton lists, we could partition until each sublist had k or k-1 elements. Once we get the sorted sublists, we can merge them as before with the `merge()` subroutine.\n",
"* Recursively, sub-divide the unsorted list into n/k sublists, each containing k or k-1 elements.\n",
"* Apply `sortk()` to each sublist.\n",
"* Repeatedly `merge()` sublists to produce new sorted sublists until there is only one sublist remaining. This will be the sorted list.\n",
"\n",
"This basic generalization will allow us control the memory usage of a sort operator. We will load in batches of k tuples into memory and apply a known in-memory sorting algorithm, and then write the sorted values to disk. Then, we will apply a streaming `merge()` algorithm to merge the sorted lists.\n",
"\n",
"## Merge Operator\n",
"The first thing that we need to write is a streaming merge operator. A merge operator is given two iterators in sorted order. It combines the iterators and preserve the sorted order. Suppose, we have lists `[1,4,6]` and `[0,3,9]`, the result is `[0,1,3,4,6,9]`."
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"class Merge:\n",
" '''\n",
" Given two sorted iterators as input, merge returns each next \n",
" sorted value (the otuput is sorted)\n",
" '''\n",
" \n",
" def __init__(self, in1, in2):\n",
" self.in1 = in1\n",
" self.in2 = in2\n",
" \n",
" \n",
" def __iter__(self):\n",
" self.it1 = iter(self.in1)\n",
" self.it2 = iter(self.in2)\n",
" \n",
" #pin the top two elements\n",
" self.i = self._inc1()\n",
" self.j = self._inc2()\n",
" \n",
" return self\n",
" \n",
" \n",
" #helper methods that increment either 1 or 2\n",
" def _inc1(self):\n",
" try:\n",
" return next(self.it1)\n",
" except StopIteration:\n",
" return None\n",
" \n",
" def _inc2(self):\n",
" try:\n",
" return next(self.it2)\n",
" except StopIteration:\n",
" return None\n",
" \n",
" \n",
" \n",
" #get the next value\n",
" def __next__(self):\n",
" \n",
" if self.i == None and self.j != None:\n",
" #is in1 finished?\n",
" \n",
" rtn = self.j\n",
" self.j = self._inc2()\n",
" return rtn\n",
" elif self.j == None and self.i != None:\n",
" #is in2 finished?\n",
" \n",
" rtn = self.i\n",
" self.i = self._inc1()\n",
" return rtn\n",
" elif self.i == None and self.j == None:\n",
" #are we fully finished?\n",
" \n",
" raise StopIteration()\n",
" elif self.i <= self.j:\n",
" #in1 bigger?\n",
" \n",
" rtn = self.i\n",
" self.i = self._inc1()\n",
" return rtn \n",
" else:\n",
" #in2 bigger?\n",
" \n",
" rtn = self.j\n",
" self.j = self._inc2()\n",
" return rtn "
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"0\n",
"1\n",
"2\n",
"4\n",
"5\n",
"7\n"
]
}
],
"source": [
"for i in Merge([1,2,4],[0,5,7]):\n",
" print(i)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"This merge operator works in a streaming way. It only keeps the top two elements as state at any given time. \n",
"\n",
"## Load and Flush\n",
"The next thing that we need is a way to write data to disk and load data from disk. We are going to define two functions `Load(filename)` (which returns an iterator over lines in the file) and `Flush(iter, filename)` (stores an iterator to a file). Treat this as a black box and the code is in `iosim.py`. "
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"0\n",
"1\n",
"2\n",
"4\n",
"5\n",
"7\n"
]
}
],
"source": [
"from iosim import *\n",
"\n",
"#for i in Load('input'):\n",
"# print(i)\n",
"\n",
"#Flush(Load('input'), 'my_class_output2')\n",
"\n",
"\n",
"for i in Load('output'):\n",
" print(i)\n",
"\n",
"\n",
"#for i in Merge(Load('my_class_output2'),[0,5,7]):\n",
"# print(i)\n",
"\n",
"#Flush(Merge(Load('input'),[0,5,7]), 'output')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Merge Passes\n",
"Suppose we wanted to merge N sorted iterators, we would have to apply `Merge` passes (merging two at a time). The `one_pass` function takes a list of filenames (pointing to sorted files) and then merges every two and returns a list of new filenames."
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [],
"source": [
"def one_pass(ins):\n",
" '''\n",
" Given N sorted files as input, returns \n",
" filenames of the newly merged files\n",
" '''\n",
" output_iterators = []\n",
" for i in range(1,len(ins),2):\n",
" filename = ins[i-1] + \".\" + ins[i]\n",
" \n",
" #Key construst! Load and Flush\n",
" Flush(Merge(Load(ins[i-1]),Load(ins[i])), filename)\n",
" \n",
" output_iterators.append(filename)\n",
" \n",
" if len(ins)%2 == 1: #if odd add the last iterator\n",
" output_iterators.append(ins[len(ins)-1])\n",
" \n",
" return output_iterators"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We execute the passes until only one iterator is left. For example, given the files `input1` and `input2`, we get:"
]
},
{
"cell_type": "code",
"execution_count": 23,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"['input1.input2', 'input']"
]
},
"execution_count": 23,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"one_pass(['input1', 'input2', 'input'])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Note the case when the number of files is odd one of the runs will not get merged. This is fine because it will get merged in the next round because we will take repeated passes until one file is left."
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [],
"source": [
"def passes(ins):\n",
" '''\n",
" Takes in a list of sorted files and outputs a single sorted file\n",
" '''\n",
" #run each of the merge passes\n",
" while len(ins) > 1:\n",
" ins = one_pass(ins)\n",
" \n",
" #return the final output\n",
" return ins[0]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We can run this code to merge N runs at a time. "
]
},
{
"cell_type": "code",
"execution_count": 25,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"1\n",
"2\n",
"4\n",
"5\n",
"6\n",
"6\n",
"7\n",
"7\n",
"7\n",
"9\n",
"9\n",
"12\n",
"14\n"
]
}
],
"source": [
"output_file = passes(['input1', 'input2', 'input3'])\n",
"\n",
"for i in Load(output_file):\n",
" print(i)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## In-Memory Sort\n",
"Now, we are ready to create the sorted runs for the Merge operator to merge. This requires taking an input iterator and partitioning it into chunks of k elements. It will then do an in-memory sort on each of those chunks and flush that chunk to disk."
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [],
"source": [
"def sortk(in1, k):\n",
" buffer = []\n",
" filenames = []\n",
" \n",
" for i,v in enumerate(in1):\n",
" buffer.append(v)\n",
" \n",
" if len(buffer) >= k:\n",
" filename = 'output'+str(i)\n",
" \n",
" #flush when limit reached\n",
" buffer.sort()\n",
" Flush(buffer, filename)\n",
" buffer = []\n",
" \n",
" filenames.append(filename)\n",
" \n",
" #flush remaining elements\n",
" buffer.sort()\n",
" filename = 'output'+str(i+1)\n",
" filenames.append(filename)\n",
" Flush(buffer, filename)\n",
" buffer = []\n",
" \n",
" return filenames"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Example code of this working is breaking up a stream of 7 numbers into chunks of 3"
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"['output2', 'output5', 'output8', 'output9']"
]
},
"execution_count": 16,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"sortk([2,4,1,6,3,9,10,-1,2], 3)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The output of `sortk` can be passed into passes to get the final result:"
]
},
{
"cell_type": "code",
"execution_count": 18,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"'output9.output19.output29.output39.output48'"
]
},
"execution_count": 18,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"\n",
"file = passes(sortk(Load('input'), 10))\n",
"file\n"
]
},
{
"cell_type": "code",
"execution_count": 19,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"-3\n",
"-3\n",
"-3\n",
"-3\n",
"-3\n",
"-3\n",
"-3\n",
"-3\n",
"-3\n",
"-3\n",
"-1\n",
"1\n",
"1\n",
"1\n",
"1\n",
"1\n",
"1\n",
"1\n",
"1\n",
"1\n",
"1\n",
"1\n",
"2\n",
"3\n",
"4\n",
"4\n",
"4\n",
"4\n",
"4\n",
"4\n",
"4\n",
"4\n",
"4\n",
"4\n",
"4\n",
"9\n",
"10\n",
"11\n",
"11\n",
"11\n",
"11\n",
"11\n",
"11\n",
"11\n",
"11\n",
"11\n",
"11\n",
"16\n"
]
}
],
"source": [
"for i in Load('output9.output19.output29.output39.output48'):\n",
" print(i)"
]
},
{
"cell_type": "code",
"execution_count": 26,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"-3\n",
"-3\n",
"-3\n",
"-3\n",
"-3\n",
"-3\n",
"-3\n",
"-3\n",
"-3\n",
"-3\n",
"-1\n",
"1\n",
"1\n",
"1\n",
"1\n",
"1\n",
"1\n",
"1\n",
"1\n",
"1\n",
"1\n",
"1\n",
"2\n",
"3\n",
"4\n",
"4\n",
"4\n",
"4\n",
"4\n",
"4\n",
"4\n",
"4\n",
"4\n",
"4\n",
"4\n",
"9\n",
"10\n",
"11\n",
"11\n",
"11\n",
"11\n",
"11\n",
"11\n",
"11\n",
"11\n",
"11\n",
"11\n",
"16\n",
"Total time 0.035851\n"
]
}
],
"source": [
"import datetime\n",
"\n",
"now = datetime.datetime.now()\n",
"\n",
"file = passes(sortk(Load('input'), 10))\n",
"for i in Load(file):\n",
" print(i)\n",
"\n",
"print('Total time', (datetime.datetime.now()-now).total_seconds() )"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.4"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment