"In the previous lecture, we looked at a brief introduction to iterators in Python. Iterators will serve as the basic building block for the systems that we will consider. This lecture will look at \"functions\" of iterators, i.e., procedures that produce and consume iterators.\n",
"\n",
"## FileScan Iterator \n",
"Let us revist the `FileScan` from the previous lecture. This iterator loads lines from a file one-by-one: "
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"class FileScan:\n",
" \"\"\"Loads a large file into the\n",
" program line-by-line\"\"\"\n",
" \n",
" def __init__(self, filename):\n",
" self.filename = filename\n",
" \n",
" def __iter__(self):\n",
" self.file = open(self.filename, 'r')\n",
" self.line = self.file.readline()\n",
" return self\n",
" \n",
" def __next__(self):\n",
" if self.line != \"\":\n",
" result = int(self.line)\n",
" self.line = self.file.readline()\n",
" return result\n",
" else:\n",
" self.file.close()\n",
" raise StopIteration"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We can use this iterator in code to process the lines in a specified file of numbers."
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[5, 6, 7, 9, 15]\n"
]
}
],
"source": [
"import itertools\n",
"file = FileScan('my_file')\n",
"print(list(itertools.islice(file, 5)))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Suppose, we wanted to transform every element in this file, e.g, normalize each value by 100. We could write code as follows:"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"0.05\n",
"0.06\n",
"0.07\n",
"0.09\n",
"0.15\n"
]
}
],
"source": [
"for i in FileScan('my_file'):\n",
" print(i/100.0)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"In a sense, the transformation `i/100.0` defines another iterator. We can make this explicit with a new iterator class. This iterator class will take *another iterator* as an argument in its constructor, and return each next value transformed. "
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [],
"source": [
"class Normalize:\n",
" \"\"\"Divides each of an iterator of numbers by\n",
" 100\"\"\"\n",
" \n",
" def __init__(self, iter_in):\n",
" self.iter_in = iter_in\n",
" \n",
" def __iter__(self):\n",
" self.input_state = iter(self.iter_in) \n",
" #we have to explicitly perserve the input\n",
" #state. \n",
" return self\n",
"\n",
" def __next__(self):\n",
" #poll the next value of the input and divide it by 100.\n",
" return next(self.input_state)/100.0"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let us see how we can use `Normalize` to simplify our code. Now, we can simply compose the two iterator classes and we get the right behavior. "
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"0.05\n",
"0.06\n",
"0.07\n",
"0.09\n",
"0.15\n"
]
}
],
"source": [
"for i in Normalize(FileScan('my_file')):\n",
" print(i)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now, let's consider an example where we want to change the number of elements. Consider a `Filter` iterator that removes all values of the input iterator less than a threshold."
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [],
"source": [
"class Filter:\n",
" \"\"\"Skips elements that are less than\n",
" a given threshold\"\"\"\n",
" \n",
" def __init__(self, iter_in, thresh):\n",
" self.iter_in = iter_in\n",
" self.thresh = thresh\n",
" \n",
" def __iter__(self):\n",
" self.input_state = iter(self.iter_in) \n",
" #we have to explicitly perserve the input\n",
" #state. \n",
" return self\n",
"\n",
" def __next__(self):\n",
" #skip elements less than the threshold\n",
" elem = next(self.input_state)\n",
" \n",
" if elem < self.thresh:\n",
" return self.__next__() #Recursive, whoa!\n",
" \n",
" return elem"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We can compose all of the iterator classes together and get a transformed and filtered iterator over the data."
]
},
{
"cell_type": "code",
"execution_count": 17,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"0.06\n",
"0.07\n",
"0.09\n",
"0.15\n"
]
}
],
"source": [
"for i in Filter(Normalize(FileScan('my_file')),0.06):\n",
" print(i)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"There are several interesting aspects of this programming model. Notice that the code `Filter(Normalize(FileScan('my_file')),0.1)` runs nearly instantly. Until you explicitly call for the next element from that expression *it will not evaluate anything*. In programming language theory, this is called lazy evaluation---an evaluation strategy which delays the evaluation of an expression until its value is needed (non-strict evaluation). Lazy evaluation is indispensible for situations when data are delayed or there are unpredictable timing issues. Let's consider a variant of the `FileScan` iterator that is \"broken\" meaning it has delays in retrieving data. We added an artificial 1 second sleep in between each line fetched:"
]
},
{
"cell_type": "code",
"execution_count": 18,
"metadata": {},
"outputs": [],
"source": [
"import time\n",
"\n",
"class BrokenFileScan:\n",
" \"\"\"Loads a large file into the\n",
" program line-by-line\"\"\"\n",
" \n",
" def __init__(self, filename):\n",
" self.filename = filename\n",
" \n",
" def __iter__(self):\n",
" self.file = open(self.filename, 'r')\n",
" self.line = self.file.readline()\n",
" return self\n",
" \n",
" def __next__(self):\n",
" if self.line != \"\":\n",
" result = int(self.line)\n",
" self.line = self.file.readline()\n",
" \n",
" time.sleep( 2 ) #sleep for 1 sec\n",
" \n",
" return result\n",
" else:\n",
" self.file.close()\n",
" raise StopIteration"
]
},
{
"cell_type": "code",
"execution_count": 23,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"0.06\n",
"0.07\n",
"0.09\n",
"0.15\n"
]
}
],
"source": [
"for i in Filter(Normalize(BrokenFileScan('my_file')),0.06):\n",
" print(i)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"An iterator model allows you to avoid delays that are unnessary to your program. Suppose, we were interested in only taking the first 3 elements (a 3 sec delay):"
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[1, 23, 4]\n"
]
}
],
"source": [
"import itertools\n",
"file = BrokenFileScan('my_file')\n",
"print(list(itertools.islice(file, 3)))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"In this sense, a programming with iterators is self-optimizing. Downstream logic consumes only what it needs. "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Operators\n",
"`Filter`, `Normalize` and `FileScan` are special cases of a general concept of an `Operator`. Manipulating iterators is a key tool in the design of data-intensive systems. Operators define transformations of iterators. An operator is an object produced from a collection iterators that is itself an iterable object. Maintaining this discipline and programming with operators is a key tool to allow for robust and efficient code. We will show later that many important computations can be expresses simply as a composition of operators."
]
},
{
"cell_type": "code",
"execution_count": 19,
"metadata": {},
"outputs": [],
"source": [
"class Operator:\n",
" \"\"\"A template for a generic operator\"\"\"\n",
" \n",
" def __init__(self, inputs, args):\n",
" self.inputs = inputs\n",
" self.args = args\n",
" \n",
" def __iter__(self):\n",
" self.iterators = [iter(i) for i in inputs] #store a list of iterators\n",
" return self\n",
" \n",
" def __next__(self):\n",
" # do something here!!!\n",
" raise NotImplemented(\"DO SOMETHING HERE!!!\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's now consider an example of an operator that consumes multiple input iterators. Consider two iterators `in1` and `in2`, each iterates over a stream of numbers. We want to define a `MatchOperator` that iterates over all elements that appear in *both* iterators. The algorithm that we are going to use is called a Nested Loop Join. In pseudo-code, a nested loop join: iterates over one of the iterators, then for each element, iterates over the other iterator. Below is an animation of the basic iteration scheme:\n",