In a previous note, I discussed running coroutines in a non-blocking manner using gather. This approach works well when you have a known number of coroutines that you want to run in a non-blocking manner. However, if you have tens, hundreds, or more tasks, especially when network calls are involved, it can be important to limit concurrency. We can use a semaphore to limit the number of coroutines that are running at once by blocking until other coroutines have finished executing.

import asyncio

async def constrained_execution(funcs, max_runs):
  jobs = []
  semaphore = asyncio.Semaphore(max_runs)

  async def job(fn):
    async with semaphore:
      return await fn()

  for func in funcs:
  return await asyncio.gather(*jobs)

async def process(item):
  print(f"Processing item {item}")
  await asyncio.sleep(1)
  return item

async def main():
  fns = []
  for item in range(100):
    fns.append(lambda item=item: process(item))
  results = await constrained_execution(fns, 10)

if __name__ == "__main__":

This approach is especially useful when you want to improve performance by making network requests using coroutines, but don’t want to exceed a specific number of requests in parallel, due to issues like rate limiting or API availability.

import aiohttp
import json

async def fetch_post_title(session, id_):
    url = f"{id_}"
    async with session.get(url) as response:
        response = json.loads(await response.text())
        return response[0]["title"]

async def main():

  ids = range(1, 101)
  async with aiohttp.ClientSession() as session:
    fns = [
      lambda id_=id_: fetch_post_title(session, id_) for id_ in ids
    results = await constrained_execution(fns, 5)
    print(len(results)) # 100
    print(results) # [ '...', '...', ... ]

if __name__ == "__main__":

A function like constrained_execution can be useful for scripting tasks you encounter in your day to day.