Commit 8da8ded9 by Sanjay Krishnan

Updated with more data

parent 688691cf
......@@ -435,7 +435,7 @@
{
"cell_type": "code",
"execution_count": 91,
"id": "0248b4a6",
"id": "4fbc338f",
"metadata": {},
"outputs": [
{
......@@ -468,7 +468,7 @@
{
"cell_type": "code",
"execution_count": 86,
"id": "a53037d7",
"id": "1a4c1bca",
"metadata": {},
"outputs": [
{
......@@ -493,7 +493,7 @@
{
"cell_type": "code",
"execution_count": 87,
"id": "e285087a",
"id": "48d84085",
"metadata": {},
"outputs": [
{
......@@ -518,7 +518,7 @@
{
"cell_type": "code",
"execution_count": 89,
"id": "19a949e5",
"id": "5b2387e8",
"metadata": {},
"outputs": [
{
......@@ -546,15 +546,66 @@
},
{
"cell_type": "code",
"execution_count": null,
"id": "284684ec",
"execution_count": 94,
"id": "66779452",
"metadata": {},
"outputs": [],
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"goo\n"
]
},
{
"data": {
"image/png": "\n",
"text/plain": [
"<IPython.core.display.Image object>"
]
},
"execution_count": 94,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"#model serving\n",
"#service oriented architecture\n",
"#key performance indicators"
"from dask import delayed\n",
"\n",
"def f(x):\n",
" print('foo' + str(x))\n",
" return x + 1\n",
"\n",
"def b(x,y):\n",
" print('bar')\n",
" return x + y\n",
"\n",
"def g():\n",
" print('goo')\n",
" return 1\n",
"\n",
"x = delayed(g)()\n",
"y = delayed(f)(x) + delayed(f)(3)\n",
"z = delayed(b)(g(), y)\n",
"\n",
"z.visualize()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "c0df2b21",
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"id": "add91227",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
......
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Using and Debugging Dask (and other Big Data systems)\n",
"\n",
"In this lecture, we're going to deep dive into using Dask and apply what we've learned in previous lectures to understand why the code works the way that it does.\n",
"\n",
"First, we are going import dask. "
]
},
{
"cell_type": "code",
"execution_count": 19,
"metadata": {},
"outputs": [],
"source": [
"import dask.dataframe as dd"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now, let's load some data into this notebook."
]
},
{
"cell_type": "code",
"execution_count": 20,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>hvfhs_license_num</th>\n",
" <th>dispatching_base_num</th>\n",
" <th>pickup_datetime</th>\n",
" <th>dropoff_datetime</th>\n",
" <th>PULocationID</th>\n",
" <th>DOLocationID</th>\n",
" <th>SR_Flag</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>HV0003</td>\n",
" <td>B02867</td>\n",
" <td>2019-02-01 00:05:18</td>\n",
" <td>2019-02-01 00:14:57</td>\n",
" <td>245</td>\n",
" <td>251</td>\n",
" <td>NaN</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>HV0003</td>\n",
" <td>B02879</td>\n",
" <td>2019-02-01 00:41:29</td>\n",
" <td>2019-02-01 00:49:39</td>\n",
" <td>216</td>\n",
" <td>197</td>\n",
" <td>NaN</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>HV0005</td>\n",
" <td>B02510</td>\n",
" <td>2019-02-01 00:51:34</td>\n",
" <td>2019-02-01 01:28:29</td>\n",
" <td>261</td>\n",
" <td>234</td>\n",
" <td>NaN</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>HV0005</td>\n",
" <td>B02510</td>\n",
" <td>2019-02-01 00:03:51</td>\n",
" <td>2019-02-01 00:07:16</td>\n",
" <td>87</td>\n",
" <td>87</td>\n",
" <td>NaN</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <td>HV0005</td>\n",
" <td>B02510</td>\n",
" <td>2019-02-01 00:09:44</td>\n",
" <td>2019-02-01 00:39:56</td>\n",
" <td>87</td>\n",
" <td>198</td>\n",
" <td>NaN</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" hvfhs_license_num dispatching_base_num pickup_datetime \\\n",
"0 HV0003 B02867 2019-02-01 00:05:18 \n",
"1 HV0003 B02879 2019-02-01 00:41:29 \n",
"2 HV0005 B02510 2019-02-01 00:51:34 \n",
"3 HV0005 B02510 2019-02-01 00:03:51 \n",
"4 HV0005 B02510 2019-02-01 00:09:44 \n",
"\n",
" dropoff_datetime PULocationID DOLocationID SR_Flag \n",
"0 2019-02-01 00:14:57 245 251 NaN \n",
"1 2019-02-01 00:49:39 216 197 NaN \n",
"2 2019-02-01 01:28:29 261 234 NaN \n",
"3 2019-02-01 00:07:16 87 87 NaN \n",
"4 2019-02-01 00:39:56 87 198 NaN "
]
},
"execution_count": 20,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df = dd.read_csv('fhvhv_tripdata_2019-02.csv')\n",
"df.head()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's do some basic analysis, how many rows are there in this dataset?"
]
},
{
"cell_type": "code",
"execution_count": 23,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 39 s, sys: 6.21 s, total: 45.2 s\n",
"Wall time: 37.4 s\n"
]
},
{
"data": {
"text/plain": [
"20159102"
]
},
"execution_count": 23,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"%time len(df) #why does this take longer than loading"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We can also do some more complicated analyses:"
]
},
{
"cell_type": "code",
"execution_count": 24,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 44 s, sys: 7.48 s, total: 51.5 s\n",
"Wall time: 43.4 s\n"
]
},
{
"data": {
"text/plain": [
"hvfhs_license_num\n",
"HV0002 979266\n",
"HV0003 13504994\n",
"HV0004 983926\n",
"HV0005 4690916\n",
"Name: hvfhs_license_num, dtype: int64"
]
},
"execution_count": 24,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"%time df.groupby('hvfhs_license_num')['hvfhs_license_num'].count().compute() #i/o cost"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now let's redo the above analysis with a slightly different approach."
]
},
{
"cell_type": "code",
"execution_count": 27,
"metadata": {},
"outputs": [],
"source": [
"df2 = dd.read_parquet('fhvhv_tripdata_2019-02.pqt', columns=['hvfhs_license_num'])"
]
},
{
"cell_type": "code",
"execution_count": 28,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 6.14 s, sys: 896 ms, total: 7.04 s\n",
"Wall time: 6.44 s\n"
]
},
{
"data": {
"text/plain": [
"hvfhs_license_num\n",
"HV0002 979266\n",
"HV0003 13504994\n",
"HV0004 983926\n",
"HV0005 4690916\n",
"Name: hvfhs_license_num, dtype: int64"
]
},
"execution_count": 28,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"%time df2.groupby('hvfhs_license_num')['hvfhs_license_num'].count().compute() #why?"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Why is this so much faster? Now let's look into the implementation."
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"20"
]
},
"execution_count": 7,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df.npartitions"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 1.05 s, sys: 216 ms, total: 1.26 s\n",
"Wall time: 1.29 s\n"
]
},
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>hvfhs_license_num</th>\n",
" <th>dispatching_base_num</th>\n",
" <th>pickup_datetime</th>\n",
" <th>dropoff_datetime</th>\n",
" <th>PULocationID</th>\n",
" <th>DOLocationID</th>\n",
" <th>SR_Flag</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>HV0005</td>\n",
" <td>B02510</td>\n",
" <td>2019-02-03 12:32:29</td>\n",
" <td>2019-02-03 12:46:41</td>\n",
" <td>33</td>\n",
" <td>97</td>\n",
" <td>1.0</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>HV0005</td>\n",
" <td>B02510</td>\n",
" <td>2019-02-03 12:48:40</td>\n",
" <td>2019-02-03 12:57:35</td>\n",
" <td>97</td>\n",
" <td>106</td>\n",
" <td>NaN</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>HV0003</td>\n",
" <td>B02764</td>\n",
" <td>2019-02-03 12:51:53</td>\n",
" <td>2019-02-03 13:01:57</td>\n",
" <td>262</td>\n",
" <td>229</td>\n",
" <td>NaN</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>HV0002</td>\n",
" <td>B02914</td>\n",
" <td>2019-02-03 12:57:56</td>\n",
" <td>2019-02-03 13:15:12</td>\n",
" <td>234</td>\n",
" <td>143</td>\n",
" <td>NaN</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <td>HV0003</td>\n",
" <td>B02888</td>\n",
" <td>2019-02-03 12:56:45</td>\n",
" <td>2019-02-03 13:09:11</td>\n",
" <td>234</td>\n",
" <td>230</td>\n",
" <td>NaN</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" hvfhs_license_num dispatching_base_num pickup_datetime \\\n",
"0 HV0005 B02510 2019-02-03 12:32:29 \n",
"1 HV0005 B02510 2019-02-03 12:48:40 \n",
"2 HV0003 B02764 2019-02-03 12:51:53 \n",
"3 HV0002 B02914 2019-02-03 12:57:56 \n",
"4 HV0003 B02888 2019-02-03 12:56:45 \n",
"\n",
" dropoff_datetime PULocationID DOLocationID SR_Flag \n",
"0 2019-02-03 12:46:41 33 97 1.0 \n",
"1 2019-02-03 12:57:35 97 106 NaN \n",
"2 2019-02-03 13:01:57 262 229 NaN \n",
"3 2019-02-03 13:15:12 234 143 NaN \n",
"4 2019-02-03 13:09:11 234 230 NaN "
]
},
"execution_count": 8,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"%time df.get_partition(2).head()"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
{
"ename": "ValueError",
"evalue": "Metadata inference failed in `apply`.\n\nYou have supplied a custom function and Dask is unable to \ndetermine the type of output that that function returns. \n\nTo resolve this please provide a meta= keyword.\nThe docstring of the Dask function you ran should have more information.\n\nOriginal error is below:\n------------------------\nValueError(\"time data 'foo' does not match format '%Y-%m-%d %H:%M:%S'\")\n\nTraceback:\n---------\n File \"/Users/sanjaykrishnan/Documents/cmsc21800/venv/lib/python3.7/site-packages/dask/dataframe/utils.py\", line 175, in raise_on_meta_error\n yield\n File \"/Users/sanjaykrishnan/Documents/cmsc21800/venv/lib/python3.7/site-packages/dask/dataframe/core.py\", line 5513, in _emulate\n return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))\n File \"/Users/sanjaykrishnan/Documents/cmsc21800/venv/lib/python3.7/site-packages/dask/utils.py\", line 900, in __call__\n return getattr(obj, self.method)(*args, **kwargs)\n File \"/Users/sanjaykrishnan/Documents/cmsc21800/venv/lib/python3.7/site-packages/pandas/core/series.py\", line 4045, in apply\n mapped = lib.map_infer(values, f, convert=convert_dtype)\n File \"pandas/_libs/lib.pyx\", line 2228, in pandas._libs.lib.map_infer\n File \"<ipython-input-9-3a0da807679d>\", line 2, in <lambda>\n deltas = df['dropoff_datetime'].apply(lambda x: datetime.strptime(x, '%Y-%m-%d %H:%M:%S').timestamp()) - df['pickup_datetime'].apply(datetime.strptime(x, '%Y-%m-%d %H:%M:%S').timestamp())\n File \"/usr/local/Cellar/python/3.7.4/Frameworks/Python.framework/Versions/3.7/lib/python3.7/_strptime.py\", line 577, in _strptime_datetime\n tt, fraction, gmtoff_fraction = _strptime(data_string, format)\n File \"/usr/local/Cellar/python/3.7.4/Frameworks/Python.framework/Versions/3.7/lib/python3.7/_strptime.py\", line 359, in _strptime\n (data_string, format))\n",
"output_type": "error",
"traceback": [
"\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
"\u001b[0;31mValueError\u001b[0m Traceback (most recent call last)",
"\u001b[0;32m~/Documents/cmsc21800/venv/lib/python3.7/site-packages/dask/dataframe/utils.py\u001b[0m in \u001b[0;36mraise_on_meta_error\u001b[0;34m(funcname, udf)\u001b[0m\n\u001b[1;32m 174\u001b[0m \u001b[0;32mtry\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 175\u001b[0;31m \u001b[0;32myield\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 176\u001b[0m \u001b[0;32mexcept\u001b[0m \u001b[0mException\u001b[0m \u001b[0;32mas\u001b[0m \u001b[0me\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m~/Documents/cmsc21800/venv/lib/python3.7/site-packages/dask/dataframe/core.py\u001b[0m in \u001b[0;36m_emulate\u001b[0;34m(func, *args, **kwargs)\u001b[0m\n\u001b[1;32m 5512\u001b[0m \u001b[0;32mwith\u001b[0m \u001b[0mraise_on_meta_error\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mfuncname\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mfunc\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mudf\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0mkwargs\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mpop\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m\"udf\"\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;32mFalse\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m-> 5513\u001b[0;31m \u001b[0;32mreturn\u001b[0m \u001b[0mfunc\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m*\u001b[0m\u001b[0m_extract_meta\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0margs\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;32mTrue\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0m_extract_meta\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mkwargs\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;32mTrue\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 5514\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m~/Documents/cmsc21800/venv/lib/python3.7/site-packages/dask/utils.py\u001b[0m in \u001b[0;36m__call__\u001b[0;34m(self, obj, *args, **kwargs)\u001b[0m\n\u001b[1;32m 899\u001b[0m \u001b[0;32mdef\u001b[0m \u001b[0m__call__\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mobj\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m*\u001b[0m\u001b[0margs\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mkwargs\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 900\u001b[0;31m \u001b[0;32mreturn\u001b[0m \u001b[0mgetattr\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mobj\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mmethod\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m*\u001b[0m\u001b[0margs\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mkwargs\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 901\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m~/Documents/cmsc21800/venv/lib/python3.7/site-packages/pandas/core/series.py\u001b[0m in \u001b[0;36mapply\u001b[0;34m(self, func, convert_dtype, args, **kwds)\u001b[0m\n\u001b[1;32m 4044\u001b[0m \u001b[0mvalues\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mastype\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mobject\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mvalues\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m-> 4045\u001b[0;31m \u001b[0mmapped\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mlib\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mmap_infer\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mvalues\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mf\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mconvert\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0mconvert_dtype\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 4046\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32mpandas/_libs/lib.pyx\u001b[0m in \u001b[0;36mpandas._libs.lib.map_infer\u001b[0;34m()\u001b[0m\n",
"\u001b[0;32m<ipython-input-9-3a0da807679d>\u001b[0m in \u001b[0;36m<lambda>\u001b[0;34m(x)\u001b[0m\n\u001b[1;32m 1\u001b[0m \u001b[0;32mfrom\u001b[0m \u001b[0mdatetime\u001b[0m \u001b[0;32mimport\u001b[0m \u001b[0mdatetime\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m----> 2\u001b[0;31m \u001b[0mdeltas\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mdf\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;34m'dropoff_datetime'\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mapply\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;32mlambda\u001b[0m \u001b[0mx\u001b[0m\u001b[0;34m:\u001b[0m \u001b[0mdatetime\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mstrptime\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mx\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m'%Y-%m-%d %H:%M:%S'\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mtimestamp\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;34m-\u001b[0m \u001b[0mdf\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;34m'pickup_datetime'\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mapply\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mdatetime\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mstrptime\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mx\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m'%Y-%m-%d %H:%M:%S'\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mtimestamp\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 3\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/usr/local/Cellar/python/3.7.4/Frameworks/Python.framework/Versions/3.7/lib/python3.7/_strptime.py\u001b[0m in \u001b[0;36m_strptime_datetime\u001b[0;34m(cls, data_string, format)\u001b[0m\n\u001b[1;32m 576\u001b[0m format string.\"\"\"\n\u001b[0;32m--> 577\u001b[0;31m \u001b[0mtt\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mfraction\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mgmtoff_fraction\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0m_strptime\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mdata_string\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mformat\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 578\u001b[0m \u001b[0mtzname\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mgmtoff\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mtt\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;34m-\u001b[0m\u001b[0;36m2\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/usr/local/Cellar/python/3.7.4/Frameworks/Python.framework/Versions/3.7/lib/python3.7/_strptime.py\u001b[0m in \u001b[0;36m_strptime\u001b[0;34m(data_string, format)\u001b[0m\n\u001b[1;32m 358\u001b[0m raise ValueError(\"time data %r does not match format %r\" %\n\u001b[0;32m--> 359\u001b[0;31m (data_string, format))\n\u001b[0m\u001b[1;32m 360\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0mlen\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mdata_string\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;34m!=\u001b[0m \u001b[0mfound\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mend\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;31mValueError\u001b[0m: time data 'foo' does not match format '%Y-%m-%d %H:%M:%S'",
"\nThe above exception was the direct cause of the following exception:\n",
"\u001b[0;31mValueError\u001b[0m Traceback (most recent call last)",
"\u001b[0;32m<ipython-input-9-3a0da807679d>\u001b[0m in \u001b[0;36m<module>\u001b[0;34m\u001b[0m\n\u001b[1;32m 1\u001b[0m \u001b[0;32mfrom\u001b[0m \u001b[0mdatetime\u001b[0m \u001b[0;32mimport\u001b[0m \u001b[0mdatetime\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m----> 2\u001b[0;31m \u001b[0mdeltas\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mdf\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;34m'dropoff_datetime'\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mapply\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;32mlambda\u001b[0m \u001b[0mx\u001b[0m\u001b[0;34m:\u001b[0m \u001b[0mdatetime\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mstrptime\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mx\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m'%Y-%m-%d %H:%M:%S'\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mtimestamp\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;34m-\u001b[0m \u001b[0mdf\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;34m'pickup_datetime'\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mapply\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mdatetime\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mstrptime\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mx\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m'%Y-%m-%d %H:%M:%S'\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mtimestamp\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 3\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 4\u001b[0m \u001b[0;31m#How would you fix this?\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m~/Documents/cmsc21800/venv/lib/python3.7/site-packages/dask/dataframe/core.py\u001b[0m in \u001b[0;36mapply\u001b[0;34m(self, func, convert_dtype, meta, args, **kwds)\u001b[0m\n\u001b[1;32m 3518\u001b[0m \u001b[0margs\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0margs\u001b[0m\u001b[0;34m,\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 3519\u001b[0m \u001b[0mudf\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;32mTrue\u001b[0m\u001b[0;34m,\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m-> 3520\u001b[0;31m \u001b[0;34m**\u001b[0m\u001b[0mkwds\u001b[0m\u001b[0;34m,\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 3521\u001b[0m )\n\u001b[1;32m 3522\u001b[0m \u001b[0mwarnings\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mwarn\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mmeta_warning\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mmeta\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m~/Documents/cmsc21800/venv/lib/python3.7/site-packages/dask/dataframe/core.py\u001b[0m in \u001b[0;36m_emulate\u001b[0;34m(func, *args, **kwargs)\u001b[0m\n\u001b[1;32m 5511\u001b[0m \"\"\"\n\u001b[1;32m 5512\u001b[0m \u001b[0;32mwith\u001b[0m \u001b[0mraise_on_meta_error\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mfuncname\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mfunc\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mudf\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0mkwargs\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mpop\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m\"udf\"\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;32mFalse\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m-> 5513\u001b[0;31m \u001b[0;32mreturn\u001b[0m \u001b[0mfunc\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m*\u001b[0m\u001b[0m_extract_meta\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0margs\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;32mTrue\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0m_extract_meta\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mkwargs\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;32mTrue\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 5514\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 5515\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/usr/local/Cellar/python/3.7.4/Frameworks/Python.framework/Versions/3.7/lib/python3.7/contextlib.py\u001b[0m in \u001b[0;36m__exit__\u001b[0;34m(self, type, value, traceback)\u001b[0m\n\u001b[1;32m 128\u001b[0m \u001b[0mvalue\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mtype\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 129\u001b[0m \u001b[0;32mtry\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 130\u001b[0;31m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mgen\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mthrow\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mtype\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mvalue\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mtraceback\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 131\u001b[0m \u001b[0;32mexcept\u001b[0m \u001b[0mStopIteration\u001b[0m \u001b[0;32mas\u001b[0m \u001b[0mexc\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 132\u001b[0m \u001b[0;31m# Suppress StopIteration *unless* it's the same exception that\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m~/Documents/cmsc21800/venv/lib/python3.7/site-packages/dask/dataframe/utils.py\u001b[0m in \u001b[0;36mraise_on_meta_error\u001b[0;34m(funcname, udf)\u001b[0m\n\u001b[1;32m 194\u001b[0m )\n\u001b[1;32m 195\u001b[0m \u001b[0mmsg\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mmsg\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mformat\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m\" in `{0}`\"\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mformat\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mfuncname\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0mfuncname\u001b[0m \u001b[0;32melse\u001b[0m \u001b[0;34m\"\"\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mrepr\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0me\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mtb\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 196\u001b[0;31m \u001b[0;32mraise\u001b[0m \u001b[0mValueError\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mmsg\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;32mfrom\u001b[0m \u001b[0me\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 197\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 198\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;31mValueError\u001b[0m: Metadata inference failed in `apply`.\n\nYou have supplied a custom function and Dask is unable to \ndetermine the type of output that that function returns. \n\nTo resolve this please provide a meta= keyword.\nThe docstring of the Dask function you ran should have more information.\n\nOriginal error is below:\n------------------------\nValueError(\"time data 'foo' does not match format '%Y-%m-%d %H:%M:%S'\")\n\nTraceback:\n---------\n File \"/Users/sanjaykrishnan/Documents/cmsc21800/venv/lib/python3.7/site-packages/dask/dataframe/utils.py\", line 175, in raise_on_meta_error\n yield\n File \"/Users/sanjaykrishnan/Documents/cmsc21800/venv/lib/python3.7/site-packages/dask/dataframe/core.py\", line 5513, in _emulate\n return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))\n File \"/Users/sanjaykrishnan/Documents/cmsc21800/venv/lib/python3.7/site-packages/dask/utils.py\", line 900, in __call__\n return getattr(obj, self.method)(*args, **kwargs)\n File \"/Users/sanjaykrishnan/Documents/cmsc21800/venv/lib/python3.7/site-packages/pandas/core/series.py\", line 4045, in apply\n mapped = lib.map_infer(values, f, convert=convert_dtype)\n File \"pandas/_libs/lib.pyx\", line 2228, in pandas._libs.lib.map_infer\n File \"<ipython-input-9-3a0da807679d>\", line 2, in <lambda>\n deltas = df['dropoff_datetime'].apply(lambda x: datetime.strptime(x, '%Y-%m-%d %H:%M:%S').timestamp()) - df['pickup_datetime'].apply(datetime.strptime(x, '%Y-%m-%d %H:%M:%S').timestamp())\n File \"/usr/local/Cellar/python/3.7.4/Frameworks/Python.framework/Versions/3.7/lib/python3.7/_strptime.py\", line 577, in _strptime_datetime\n tt, fraction, gmtoff_fraction = _strptime(data_string, format)\n File \"/usr/local/Cellar/python/3.7.4/Frameworks/Python.framework/Versions/3.7/lib/python3.7/_strptime.py\", line 359, in _strptime\n (data_string, format))\n"
]
}
],
"source": [
"from datetime import datetime\n",
"deltas = df['dropoff_datetime'].apply(lambda x: datetime.strptime(x, '%Y-%m-%d %H:%M:%S').timestamp()) - df['pickup_datetime'].apply(datetime.strptime(x, '%Y-%m-%d %H:%M:%S').timestamp())\n",
"\n",
"#How would you fix this?"
]
},
{
"cell_type": "code",
"execution_count": 29,
"metadata": {},
"outputs": [],
"source": [
"def safe_strptime(x):\n",
" try:\n",
" time = datetime.strptime(x, '%Y-%m-%d %H:%M:%S').timestamp()\n",
" return time\n",
" except:\n",
" return 0"
]
},
{
"cell_type": "code",
"execution_count": 30,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"/Users/sanjaykrishnan/Documents/cmsc21800/venv/lib/python3.7/site-packages/dask/dataframe/core.py:3522: UserWarning: \n",
"You did not provide metadata, so Dask is running your function on a small dataset to guess output types. It is possible that Dask will guess incorrectly.\n",
"To provide an explicit output types or to silence this message, please provide the `meta=` keyword, as described in the map or apply function that you are using.\n",
" Before: .apply(func)\n",
" After: .apply(func, meta=('dropoff_datetime', 'int64'))\n",
"\n",
" warnings.warn(meta_warning(meta))\n",
"/Users/sanjaykrishnan/Documents/cmsc21800/venv/lib/python3.7/site-packages/dask/dataframe/core.py:3522: UserWarning: \n",
"You did not provide metadata, so Dask is running your function on a small dataset to guess output types. It is possible that Dask will guess incorrectly.\n",
"To provide an explicit output types or to silence this message, please provide the `meta=` keyword, as described in the map or apply function that you are using.\n",
" Before: .apply(func)\n",
" After: .apply(func, meta=('pickup_datetime', 'int64'))\n",
"\n",
" warnings.warn(meta_warning(meta))\n"
]
}
],
"source": [
"deltas = df['dropoff_datetime'].apply(safe_strptime) - df['pickup_datetime'].apply(safe_strptime)"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 31.1 s, sys: 381 ms, total: 31.5 s\n",
"Wall time: 33 s\n"
]
},
{
"data": {
"text/plain": [
"0 579.0\n",
"1 490.0\n",
"2 2215.0\n",
"3 205.0\n",
"4 1812.0\n",
"dtype: float64"
]
},
"execution_count": 12,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"%time deltas.head()"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 33 s, sys: 575 ms, total: 33.5 s\n",
"Wall time: 36.3 s\n"
]
},
{
"data": {
"text/plain": [
"0 579.0\n",
"1 490.0\n",
"2 2215.0\n",
"3 205.0\n",
"4 1812.0\n",
" ... \n",
"95 635.0\n",
"96 2116.0\n",
"97 2886.0\n",
"98 1489.0\n",
"99 1503.0\n",
"Length: 100, dtype: float64"
]
},
"execution_count": 13,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"%time deltas.head(100)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's try to fix this!"
]
},
{
"cell_type": "code",
"execution_count": 31,
"metadata": {},
"outputs": [],
"source": [
"deltas = deltas.repartition(1000)"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 29.2 s, sys: 504 ms, total: 29.7 s\n",
"Wall time: 30.5 s\n"
]
},
{
"data": {
"text/plain": [
"0 579.0\n",
"1 490.0\n",
"2 2215.0\n",
"3 205.0\n",
"4 1812.0\n",
"dtype: float64"
]
},
"execution_count": 15,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"%time deltas.head()"
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 34.9 s, sys: 740 ms, total: 35.6 s\n",
"Wall time: 43.8 s\n"
]
},
{
"data": {
"text/plain": [
"0 579.0\n",
"1 490.0\n",
"2 2215.0\n",
"3 205.0\n",
"4 1812.0\n",
" ... \n",
"1995 1698.0\n",
"1996 1672.0\n",
"1997 1948.0\n",
"1998 951.0\n",
"1999 685.0\n",
"Length: 2000, dtype: float64"
]
},
"execution_count": 16,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"%time deltas.head(2000)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now let's try to get the total number of seconds across the whole dask frame"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],