TDM 30200: Project 2 — 2023
Motivation: The the previous project, we very slowly started to learn about asynchronous vs synchronous programming. Mostly, you just had to describe scenarios, whether they were synchronous or asynchronous, and you had to explain things at a high level. In this project, we will dig into some asynchronous code, and learn the very basics.
Context: This is the first project in a series of three projects that explore sync vs. async, parallelism, and concurrency.
Scope: Python, coroutines, tasks
Dataset(s)
The following questions will use the following dataset(s):
-
/anvil/projects/tdm/data/flights/subset/*
Questions
Question 1
In the previous project, I gave you the following scenario.
You have 2 reports to write, and 2 wooden pencils. 1 sharpened pencil will write 1/2 of 1 report. You have a helper that is willing to sharpen 1 pencil at a time, for you, and that helper is able to sharpen a pencil in the time it takes to write 1/2 of 1 report. You can assume that you start with 2 sharpened pencils.
In this asynchronous example, the author could start with the first sharpened pencil and write 1/2 of the report in 5 seconds. Next, hand the first pencil off to the assistant to help sharpen it. While that is happening, use the second pencil to write the second half of the first report. Next, receive the first (now sharpened) pencil back from the assistant and hand the second pencil to the assistant to be sharpened. While the assistant was sharpening the second pencil, you would write the first half of the second report. The assistant would return the (now sharpened) second pencil back to you to finish the second report. This process would (in theory) take 20 seconds as the assistant would be sharpening pencils at the same time you are writing the report. As an effect, you could exclude the 4 seconds it takes to sharpen both pencils once, from our synchronous solution of 24 seconds.
In this project we will examine how to write asynchronous code that simulates the scenario, in a variety of ways that will teach you how to write asynchronous code. At the end of the project, you will write your own asynchronous code that will speed up a web scraping task. Let’s get started!
Jupyter Lab has its own event loop already running, which causes problems if you were to try to run your own event loop. Async code uses an event loop, so this poses a problem. To get by this, we can use a package that automatically nests our event loops, so things work mostly as we would expect.
|
Fill in the skeleton code below to simulate the scenario. Use only the provided functions, sharpen_pencil
, and write_half_report
, and the await
keyword.
async def sharpen_pencil():
await asyncio.sleep(2)
async def write_half_report():
await asyncio.sleep(5)
async def simulate_story():
# Write first half of report with first pencil
# Hand pencil off to assistant to sharpen
# Write second half of report with second pencil
# Hand second pencil back to assistant to sharpen
# take first (now sharpened) pencil back from assistant
# Write the first half of second essay with first pencil
# Take second (now sharpened) pencil back from assistant
# and write the second half of the second report
Run the simulation in a new cell as follows.
%%time
import asyncio
import nest_asyncio
nest_asyncio.apply()
asyncio.run(simulate_story())
How long does you code take to run? Does it take the expected 20 seconds? If you have an idea why or why not, please try to explain. Otherwise, just say "I don’t know".
-
Code used to solve this problem.
-
Output from running the code.
Question 2
If you don’t have any experience writing asynchronous code, this might be pretty confusing! That’s okay, it is much easier to get confused writing asynchronous code than it is to write synchronous code. In fact, it is safe to say that writing good parallel and/or asynchronous code is significantly more difficult than writing non async/parallel code.
Let’s break it down. First, the asyncio.run
function takes care of the bulk of the work. It starts the event loop, finalizes asynchronous generators, and closes the threadpool. All you need to take from it is "it takes care of a lot of ugly magic".
Any function that starts with async
is an asynchronous function. Calling an async function produces a coroutine, nearly instantly. A coroutine is a function that has the ability to have its progress be paused and resumed at will.
For example, if you called the following async function, it will not execute, but rather it will just create a coroutine object.
async def foo():
await asyncio.sleep(5)
print("Hello")
foo()
<Coroutine object at 0x7f8b8b8b9b50>
This result should be shown nearly instantly, the sleep code hasn’t actually been run! In fact, if your async code runs way faster than expected, this may be a sign that you’ve forgotten to await a coroutine. Don’t worry too much about that for now.
To actually run the coroutine, you need to call the asyncio.run
function.
asyncio.run(foo())
Hello
Of course, it doesn’t make sense to call asyncio.run
for each and every coroutine you create. It makes more sense to spin up the event loop once and handle the processes while it is running.
%%time
import asyncio
import nest_asyncio
nest_asyncio.apply()
async def foo():
await asyncio.sleep(5)
print("Hello")
async def bar():
await asyncio.sleep(2)
print("World")
async def main():
await foo()
await bar()
asyncio.run(main())
Run the code, what is the output?
Let’s take a step back. Why is asynchronous code useful? What do our asyncio.sleep
calls represent? One of the slowest parts of a program is waiting for I/O or input/output. It takes time to wait for the operating system and hardware. If you are doing a lot of I/O in your program, you could take advantage and perform other operations while waiting! In our example, this is what the asyncio.sleep
calls represent — I/O!
Any program where the IO speed limits the speed of the program is called I/O Bound. Any program where the program speed is limited by how fast the CPU can process the instructions is called CPU Bound. Async programming can drastically speed up I/O Bound software!
Okay, back to the code from above. What is the output? You may have expected foo
to run, then, while foo
is "doing some IO (sleeping)", bar
will run. Then, in a total of 5 seconds, you may have expected "World Hello" to be printed. While the foo
is sleeping, bar
runs, gets done in 2 seconds, goes back to foo
and finishes in another 3 seconds, right? Nope.
What happens is that when we await for foo
, Python suspends the execution of main
until foo
is done. Then it resumes execution of main
and suspends it again until bar
is done for an approximate time of 7 seconds. We want both coroutines to run concurrently, not one at a time! How do we fix it? The easiest would be to use asyncio.gather
.
%%time
import asyncio
import nest_asyncio
nest_asyncio.apply()
async def foo():
await asyncio.sleep(5)
print("Hello")
async def bar():
await asyncio.sleep(2)
print("World")
async def main():
await asyncio.gather(foo(), bar())
asyncio.run(main())
asyncio.gather
takes a list of awaitable objects (coroutines are awaitable objects) and runs them concurrently by scheduling them as a task. Running the code above should work as expected, and run in approximately 5 seconds. We gain 2 seconds in performance since both foo
and bar
run concurrently. While foo
is sleeping, bar
is running and completes. We gain 2 seconds while those functions overlap.
What is a task? You can read about tasks here. A task is an object that runs a coroutine. The easiest way to create a task is to use the asyncio.create_task
method. For example, if instead of awaiting both foo
and bar
, we scheduled foo
as a task, you would get mostly the same result as if you used asyncio.gather
.
%%time
import asyncio
import nest_asyncio
nest_asyncio.apply()
async def foo():
await asyncio.sleep(5)
print("Hello")
async def bar():
await asyncio.sleep(2)
print("World")
async def main():
asyncio.create_task(foo())
await bar()
asyncio.run(main())
As you can see, "World" prints in a couple of seconds, and 3 seconds later "Hello" prints, for a total execution time of 5 seconds. With that being said, something is odd with our output.
World CPU times: user 2.57 ms, sys: 1.06 ms, total: 3.63 ms Wall time: 2 s Hello
It says that it executed in 2 seconds, not 5. In addition, "Hello" prints after Jupyter says our execution completed. Why? Well, if you read here, you will see that asyncio.create_task
takes a coroutine (in our case the output from foo()
), and schedules it as a task in the event loop returned by asyncio.get_running_loop()
. This is the critical part — it is scheduling the coroutine created by foo()
to run on the same event loop that Jupyter Lab is running on, so even though our event loop created by asyncio.run
stopped execution, foo
ran until complete instead of cancelling as soon as bar
was awaited! To observe this, open a terminal and run the following code to launch a Python interpreter:
module use /anvil/projects/tdm/opt/core
module load tdm
module load python/f2022-s2023
python3
Then, in the Python interpreter, run the following.
You may need to type it out manually. |
import asyncio
async def foo():
await asyncio.sleep(5)
print("Hello")
async def bar():
await asyncio.sleep(2)
print("World")
async def main():
asyncio.create_task(foo())
await bar()
asyncio.run(main())
As you can see, the output is not the same as when you run it from within the Jupyter notebook. Instead of:
World CPU times: user 2.57 ms, sys: 1.06 ms, total: 3.63 ms Wall time: 2 s Hello
You should get:
World
This is because this time, there is no confusion on which event loop to use when scheduling a task. Once we reach the end of main
, the event loop is stopped and any tasks scheduled are terminated — even if they haven’t finished (like foo
, in our example). If you wanted to modify main
in order to wait for foo
to complete, you could await the task after you await bar()
.
Note that this will work:
But this, will not:
The reason is that as soon as you call |
In the same way that asyncio.create_task
schedules the coroutines as tasks on the event loop (immediately), so does asyncio.gather
. In a previous example, we awaited our call to asyncio.gather
.
%%time
import asyncio
import nest_asyncio
nest_asyncio.apply()
async def foo():
await asyncio.sleep(5)
print("Hello")
async def bar():
await asyncio.sleep(2)
print("World")
async def main():
await asyncio.gather(foo(), bar())
asyncio.run(main())
World Hello CPU times: user 3.41 ms, sys: 1.96 ms, total: 5.37 ms Wall time: 5.01 s
This is critical, otherwise, main
would execute immediately and terminate before either foo
or bar
finished.
%%time
import asyncio
import nest_asyncio
nest_asyncio.apply()
async def foo():
await asyncio.sleep(5)
print("Hello")
async def bar():
await asyncio.sleep(2)
print("World")
async def main():
asyncio.gather(foo(), bar())
asyncio.run(main())
CPU times: user 432 µs, sys: 0 ns, total: 432 µs Wall time: 443 µs World Hello
As you can see, since we did not await our asyncio.gather
call, main
ran and finished immediately. The only reason "World" and "Hello" printed is that they finished running on the event loop that Jupyter uses instead of the loop we created using our call to asyncio.run
. If you were to run the code from a Python interpreter instead of from Jupyter Lab, neither "World" nor "Hello" would print.
I know this is a lot to take in for a single question. If you aren’t quite following at this point I’d highly encourage you to post questions in Piazza before continuing, or rereading things until it starts to make sense. |
Modify your simulate_story
function from question (1) so that sharpen_pencil
runs concurrently with write_quarter
, and the total execution time is about 20 seconds.
Some important notes to keep in mind:
|
-
Code used to solve this problem.
-
Output from running the code.
Question 3
That last question was quite a bit to take in! It is ok if it hasn’t all clicked! I’d encourage you to post questions in Piazza, and continue to mess around with simple async examples until it makes more sense. It will help us explain things better and improve things for the next group of students!
There are a couple of straightforward ways you could solve the previous question (well technically there are even more). One way involves queuing up the sharpen_pencil
coroutines as tasks that run concurrently, and awaiting them at the end. The other involves using asyncio.gather
to queue up select write_quarter
and sharpen_pencil
tasks to run concurrently, and await them.
While both of these methods do a great job simulating our simple story, there may be instances where a greater amount of control may be needed. In such circumstances, the Python synchronization primitives may be useful!
Read about the Event primitive, in particular. This primitive allows us to notify one or more async tasks that something has happened. This is particularly useful if you want some async code to wait for other async code to run before continuing on. Cool, how does it work? Let’s say I want to yell, but before I yell, I want the megaphone to be ready.
First, create an event, that represents some event.
import asyncio
async def yell(words, wait_for):
print(f"{words.upper()}")
# create an event
megaphone_ready = asyncio.Event()
To wait to continue until the event has occurred, you just need to await
the coroutine created by calling my_event.wait()
. So in our case, we can add my_event.wait()
before we yell in the yell
function.
async def yell(words, wait_for):
await wait_for.wait()
print(f"{words.upper()}")
By default, our Event
is set to False
since the event has not occurred. The yell
task will continue to await our event until the event is marked as set. To mark our event as set, we would use the set
method.
import asyncio
async def yell(words, wait_for):
await wait_for.wait()
print(f"{words.upper()}")
async def main():
megaphone_ready = asyncio.Event() # by default, it is not ready
# create our yell task. Remember, tasks are immediately scheduled
# on the event loop to run. At this point, the await wait_for.wait()
# part of our yell function will prevent the task from moving
# forward to the print statement until the event is set.
yell_task = asyncio.create_task(yell("Hello", megaphone_ready))
# let's say we have to dust off the megaphone for it to be ready
# and it takes 2 seconds to do so
await asyncio.sleep(2)
# now, since we've dusted off the megaphone, we can mark it as ready
megaphone_ready.set()
# at this point in time, the await wait_for.wait() part of our code
# from the yell function will be complete, and the yell function
# will move on to the print statement and actually yell
# Finally, we want to await for our yell_task to finish
# if our yell_task wasn't a simple print statement, and tooks a few seconds
# to finish, this await would be necessary for the main function to run
# to completion.
await yell_task
asyncio.run(main())
Consider each of the following as a separate event:
-
Writing the first quarter of the report
-
Writing the second quarter of the report
-
Writing the third quarter of the report
-
Writing the fourth quarter of the report
-
Sharpening the first pencil
-
Sharpening the second pencil
Use the Event
primitive to make our code run as intended, concurrently. Use the following code as a skeleton for your solution. Do not modify the code, just make additions.
%%time
import asyncio
import nest_asyncio
nest_asyncio.apply()
async def write_quarter(current_event, events_to_wait_for = None):
# TODO: if events_to_wait_for is not None
# loop through the events and await them
await asyncio.sleep(5)
# TODO: at this point, the essay quarter has
# been written and we should mark the current
# event as set
async def sharpen_pencil(current_event, events_to_wait_for = None):
# TODO: if events_to_wait_for is not None
# loop through the events and await them
await asyncio.sleep(2)
# TODO: at this point, the essay quarter has
# been written and we should mark the current
# event as set
async def simulate_story():
# TODO: declare each of the 6 events in our story
# TODO: add each function call to a list of tasks
# to be run concurrently. Should be something similar to
# tasks = [write_quarter(something, [something,]), ...]
tasks = []
await asyncio.gather(*tasks)
asyncio.run(simulate_story())
The |
The |
The code you will add to |
The |
It should take about 20 seconds to run. |
-
Code used to solve this problem.
-
Output from running the code.
Question 4
While it is certainly useful to have some experience with async programming in Python, the context in which most data scientists will deal with it is writing APIs using something like fastapi
, where a deep knowledge of async programming isn’t really needed.
What will be pretty common is the need to speed up code. One of the primary ways to do that is to parallelize your code.
In the previous project, in question (5), you described an operation that you could do to the entire flights dataset (/anvil/projects/tdm/data/flights/subset
). In this situation, where you have a collection of neatly formatted datasets, a good first step would be to write a function that accepts a two paths as arguments. The first path could be the absolute path to the dataset to be processed. The second path could be the absolute path of the intermediate output file. Then, the function could process the dataset and output the intermediate calculations.
For example, let’s say you wanted to count how many flights in the dataset as a whole. Then, you could write a function to read in the dataset, count the flights, and output a file containing the number of flights. This would be easily parallelizable because you could process each of the files individually, in parallel, and at the very end, sum up the data in the output file.
Write a function that is "ready" to be parallelized, and that follows the operation you described in question (5) in the previous project. Test out the function on at least 2 of the datasets in /anvil/projects/tdm/data/flights/subset
.
In the next project, we will parallelize and run some benchmarks. |
-
Code used to solve this problem.
-
Output from running the code.
Please make sure to double check that your submission is complete, and contains all of your code and output before submitting. If you are on a spotty internet connection, it is recommended to download your submission after submitting it to make sure what you think you submitted, was what you actually submitted. In addition, please review our submission guidelines before submitting your project. |