I spent two years avoiding asyncio. Every time I touched it, something would hang, or silently fail, or give me a RuntimeError that made zero sense. I stuck with requests and threading like everyone else, convinced async was for people who enjoyed suffering.

People collaborating at a table
The look on my face when someone says just use asyncio ( it is not confidence )

Then I had to scrape 12,000 pages in under an hour. Threading got me 60% of the way. Multiprocessing ate my RAM. Asyncio got it done in 23 minutes. I have not looked back.

The Mental Model That Finally Clicked

Asyncio is not multithreading. It is not multiprocessing. It is a single thread that switches between tasks when they are waiting on I/O. Think of it as a very efficient receptionist who never stands still ( not as a team of workers ).

The receptionist answers a call, puts it on hold, answers the next call, checks if the first one is off hold, and so on. No call gets serviced faster. But all calls get serviced eventually, and the receptionist is never idle.

Coffee cup on table
Fuel for the event loop. No judgment.

This model only works when your tasks are I/O-bound. Network requests, file reads, database queries. If you are crunching numbers, asyncio gives you nothing. Use multiprocessing for that.

The Minimum Viable Asyncio

Here is the simplest useful async program I know:

import asyncio

async def fetch(url):
    print(f"Fetching {url}")
    await asyncio.sleep(1)  # simulating a request
    print(f"Done {url}")
    return url

async def main():
    tasks = [fetch(f"http://example.com/{i}") for i in range(5)]
    results = await asyncio.gather(*tasks)
    print(results)

asyncio.run(main())

Five requests, one second each. Total time: ~1 second, not 5. That is the whole point.

The two things you actually need to understand: async def makes a coroutine. await yields control back to the event loop. Everything else is implementation details.

The Mistakes I Made Repeatedly

Busy workspace with monitors
Where I debugged asyncio at 2am. Not my finest hour.

Mistake 1: calling an async function without await. This does not run the function. It creates a coroutine object and throws it away. No error, no warning, just nothing happens. Python should scream about this, and in 3.12+ it kinda does, but in older versions you get silence.

# This does nothing useful
async def save(data):
    await write_to_db(data)

save(result)  # <- creates coroutine, never runs it
await save(result)  # <- actually runs it

Mistake 2: blocking the event loop. If you call time.sleep() or requests.get() inside an async function, you block the entire event loop. Every other coroutine stops. This is the number one way to make async code slower than sync code.

# BAD: blocks everything
async def fetch(url):
    import requests
    return requests.get(url).json()  # blocks the event loop!

# GOOD: offload to thread pool
async def fetch(url):
    import requests
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(None, lambda: requests.get(url).json())

Or just use httpx with async support. Seriously, stop using requests in async code.

Mistake 3: forgetting asyncio.gather. Running coroutines sequentially defeats the purpose:

# Sequential and slow
async def main():
    r1 = await fetch(url1)
    r2 = await fetch(url2)
    r3 = await fetch(url3)

# Parallel and fast
async def main():
    results = await asyncio.gather(
        fetch(url1),
        fetch(url2),
        fetch(url3)
    )

httpx Over requests

I switched from requests to httpx for every async project. The API is almost identical, and it supports async natively. No thread pool hacks.

import httpx
import asyncio

async def fetch_all(urls):
    async with httpx.AsyncClient() as client:
        tasks = [client.get(url) for url in urls]
        responses = await asyncio.gather(*tasks)
        return [r.json() for r in responses]

# 50 concurrent requests, clean code, no threading
urls = [f"https://api.example.com/item/{i}" for i in range(50)]
results = asyncio.run(fetch_all(urls))
Technology workspace
Clean async code. No thread pools, no tears.

Add limits with asyncio.Semaphore if the API has rate limits ( it always does ):

async def fetch_limited(urls, max_concurrent=10):
    sem = asyncio.Semaphore(max_concurrent)
    async with httpx.AsyncClient() as client:
        async def fetch_one(url):
            async with sem:
                return await client.get(url)
        tasks = [fetch_one(url) for url in urls]
        return await asyncio.gather(*tasks)

Error Handling That Does Not Eat Your Data

asyncio.gather with return_exceptions=False ( the default ) cancels everything if one task fails. This is almost never what you want. Use return_exceptions=True and handle errors after:

results = await asyncio.gather(*tasks, return_exceptions=True)

for r in results:
    if isinstance(r, Exception):
        print(f"Task failed: {r}")
    else:
        process(r)

If you need per-task timeouts, use asyncio.wait_for inside each coroutine. If you need a global timeout, use asyncio.timeout ( 3.11+ ).

When I Still Use Threading

I use asyncio for I/O-bound work ( network, files, APIs ). I use threading for one specific thing: wrapping sync libraries I cannot replace. Like some database drivers that have no async version. The run_in_executor trick is fine for that.

I use multiprocessing for CPU-bound work ( image processing, data transforms ). Never asyncio.

Mixing them is fine. asyncio.gather + run_in_executor for the sync stuff. Just be honest about what is actually async and what is just pretending.

The Setup I Use Now

import asyncio
import httpx

MAX_CONCURRENT = 20
TIMEOUT = 30

async def run_pipeline(urls):
    sem = asyncio.Semaphore(MAX_CONCURRENT)
    
    async def safe_fetch(client, url):
        async with sem:
            try:
                return await asyncio.wait_for(
                    client.get(url), timeout=TIMEOUT
                )
            except asyncio.TimeoutError:
                return None
    
    async with httpx.AsyncClient() as client:
        tasks = [safe_fetch(client, url) for url in urls]
        return await asyncio.gather(*tasks, return_exceptions=True)

# Run it
results = asyncio.run(run_pipeline(my_urls))

Semaphore for rate limiting, timeouts for stuck requests, return_exceptions for partial failures. This pattern handles 80% of what I need. The other 20% is debugging race conditions in my own code, which asyncio does not save me from, unfortunately. :)