Dai Generatori ad Asyncio: async/await non e' magia

Dai Generatori ad Asyncio: async/await non e' magia

Dai Generatori ad Asyncio: async/await non e’ magia

Un viaggio dentro le coroutine di Python — dal primo yield fino all’event loop

Questo articolo sviscera, passo per passo, come async/await in Python non sia un meccanismo magico ma l’evoluzione naturale dei generatori. Ogni concetto e’ un gradino che porta al successivo. Niente salti.

Non ci limiteremo a mostrare codice: scenderemo nei dettagli del protocollo iteratore, del bytecode, degli stack frame congelati, dell’espansione completa di yield from secondo PEP 380, del protocollo __await__, e costruiremo un event loop funzionante pezzo per pezzo.

Tutti gli esempi di codice sono verificati e funzionanti con Python 3.10+.


Il talk

Questo articolo è la versione scritta del talk “I generatori: da yield ad asyncio che ho tenuto insieme a Francesco Panico durante il PyCon Italia 2026.

PDF

    / [pdf]

Elevator pitch

async/await è probabilmente la prima riga di asyncio che ognuno di noi ha scritto. Cinque parole che sembrano banali. Ma cosa succede davvero quando il vostro codice incontra await? Cosa fa Python? Cosa fa l’event loop? Dove va a finire il controllo? Questo talk risponde a quelle domande ripercorrendo l’evoluzione del linguaggio una PEP alla volta: partiamo dal primo yield, attraversiamo le tappe che hanno trasformato i generatori in coroutine, e costruiamo un event loop da zero per dimostrare che dietro asyncio non c’è nessuna magia.

Descrizione

async/await non è un meccanismo magico, ma l’evoluzione naturale dei generatori. Per dimostrarlo ripercorriamo il filo storico una PEP alla volta, senza salti: ogni concetto è un gradino che porta al successivo.

Si parte dalla PEP 255 (Python 2.2), dove nasce yield e una funzione impara per la prima volta a sospendersi, congelando il proprio frame. Con la PEP 342 (Python 2.5) yield diventa un’espressione: arrivano send(), throw() e close(), e il generatore si trasforma in una coroutine che dialoga con il chiamante e ha un ciclo di vita controllabile dall’esterno — lo stesso meccanismo con cui asyncio cancella un Task. La PEP 380 (Python 3.3) introduce yield from, il tunnel bidirezionale che rende possibile comporre i generatori e propagare il valore di ritorno attraverso StopIteration: è il ponte concettuale verso await. La PEP 492 (Python 3.5) formalizza tutto con la sintassi dedicata async/await e il protocollo __await__, mentre la PEP 525 (Python 3.6) chiude il cerchio con gli async generators.

A metà strada ci fermiamo a costruire un event loop funzionante da zero — prima un round-robin di quindici righe, poi una versione con supporto ai timer — per mostrare che l’event loop di asyncio è semplicemente un ciclo che chiama send() sulle coroutine quando i loro awaitable sono pronti. Niente magia: solo generatori, fino in fondo.

async/await non è magia. È il risultato di 14 anni di evoluzione del modello dei generatori.


Indice

  1. Micro-ripasso: iterable e iterator
  2. Nasce yield: la funzione che si sospende
  3. Dentro al generatore: frame, stato, bytecode
  4. send(): comunicazione bidirezionale
  5. Lifecycle management: throw() e close()
  6. Il problema: composizione incompleta
  7. Il valore di ritorno: il pezzo mancante
  8. L’espansione completa di yield from
  9. Composizione reale: brew sequence
  10. Due concetti, una sintassi
  11. Da yield from ad await
  12. Il protocollo await
  13. L’event loop: da zero
  14. asyncio.run(): la forma moderna
  15. Async generators: PEP 525
  16. Use case reali
  17. Riepilogo e filo narrativo

1. Micro-ripasso: iterable e iterator

Prima di parlare di generatori, dobbiamo capire il protocollo su cui si appoggiano: il protocollo iteratore.

In Python, un oggetto e’ iterabile se implementa __iter__(), che restituisce un iteratore. Un iteratore e’ un oggetto che implementa __next__(), che restituisce il prossimo valore o solleva StopIteration quando e’ esaurito.

it = iter([1, 2, 3])

next(it)  # 1
next(it)  # 2
next(it)  # 3
next(it)  # StopIteration

Cosa fa davvero un ciclo for

Il ciclo for e’ zucchero sintattico. Ecco cosa succede sotto il cofano:

# Questo:
for x in [1, 2, 3]:
    print(x)

# Equivale a questo:
_iter = iter([1, 2, 3])    # chiama __iter__()
while True:
    try:
        x = next(_iter)     # chiama __next__()
    except StopIteration:
        break
    print(x)

Possiamo verificarlo con un iterabile “verboso”:

class VerboseIterable:
    def __iter__(self):
        print("  __iter__ called")
        return VerboseIterator()

class VerboseIterator:
    def __init__(self):
        self.n = 0
    def __next__(self):
        if self.n >= 3:
            print(f"  __next__ → raising StopIteration")
            raise StopIteration
        self.n += 1
        print(f"  __next__ → returning {self.n}")
        return self.n

for x in VerboseIterable():
    print(f"    body: x={x}")

Output:

  __iter__ called
  __next__ → returning 1
    body: x=1
  __next__ → returning 2
    body: x=2
  __next__ → returning 3
    body: x=3
  __next__ → raising StopIteration

Il for chiama __iter__() una volta, poi __next__() ripetutamente finche’ non arriva StopIteration. L’eccezione viene catturata silenziosamente — non la vediamo mai nel codice che usa for.

Questo e’ tutto quello che ci serve sapere sugli iteratori. Ora il problema: e se volessi scrivere io un oggetto che produce valori uno alla volta?

Scrivere una classe con __iter__ e __next__ richiede gestire lo stato manualmente. Per algoritmi ricorsivi o con molto stato interno, diventa rapidamente un incubo. Serviva un modo migliore.


2. Nasce yield: la funzione che si sospende (PEP 255)

La PEP 255 (Python 2.2) introduce i generatori: funzioni che contengono la keyword yield. Quando vengono chiamate, non eseguono il corpo — restituiscono un oggetto generator-iterator che controlla l’esecuzione.

def count():
    yield 1
    yield 2
    yield 3

c = count()       # no code is executed!
next(c)  # 1      # now executes until first yield
next(c)  # 2      # resumes from suspension point
next(c)  # 3
next(c)  # StopIteration

La cosa fondamentale: quando il generatore incontra yield, non termina. Si sospende. Tutta l’esecuzione si congela: variabili locali, posizione nel codice, stack di valutazione. Quando chiamiamo next(), l’esecuzione riprende esattamente dal punto in cui si era fermata.

def demo():
    x = 10
    yield x    # suspends here, x = 10
    x += 1
    yield x    # resumes here, x = 11

d = demo()
next(d)  # 10
next(d)  # 11  ← x non riparte da 10! Il frame e' stato congelato

Un generatore e’ sia iterabile che iteratore

Un dettaglio importante: l’oggetto generatore implementa entrambi i protocolli. __iter__ restituisce se stesso:

g = count()
print(hasattr(g, '__iter__'))  # True
print(hasattr(g, '__next__'))  # True
print(iter(g) is g)            # True — iter() restituisce lo stesso oggetto

Questo significa che un generatore puo’ essere usato direttamente in un for:

for n in count():
    print(n)  # 1, 2, 3

La rivelazione

Un generatore non e’ una funzione che ritorna piu’ valori. E’ una funzione che puo’ essere sospesa.

Come dice la PEP 255: “a kind of function that can return an intermediate result to its caller, but maintaining the function’s local state so the function can be resumed again right where it left off.”

yield produce un valore e sospende l’esecuzione. La sospensione e’ la cosa importante, non il valore. E chi decide quando riparte? Chi chiama next().


3. Dentro al generatore: frame, stato, bytecode

Per capire davvero cosa succede, dobbiamo guardare dentro al generatore.

Il frame object

Ogni generatore ha un attributo gi_frame che punta al frame di esecuzione congelato. Possiamo ispezionarlo:

import inspect

def inspectable():
    x = 42
    yield x
    x += 1
    yield x

gen = inspectable()

# Prima di next(): il frame esiste ma non ha ancora eseguito nulla
print(gen.gi_frame.f_locals)   # {}

next(gen)
print(gen.gi_frame.f_locals)   # {'x': 42}
print(gen.gi_frame.f_lasti)    # 14 — offset dell'ultima istruzione bytecode

next(gen)
print(gen.gi_frame.f_locals)   # {'x': 43}
print(gen.gi_frame.f_lasti)    # 40 — avanzato!

Il frame contiene:

  • f_locals — le variabili locali (persistono tra le sospensioni!)
  • f_lasti — l’offset dell’ultima istruzione bytecode eseguita
  • f_code — il code object della funzione
  • f_lineno — il numero di riga corrente

Dopo che il generatore termina (StopIteration), gi_frame diventa None:

try:
    next(gen)
except StopIteration:
    pass
print(gen.gi_frame)  # None — il frame e' stato deallocato

Il bytecode: cosa genera yield

Possiamo disassemblare un generatore per vedere il bytecode:

import dis

def simple_gen():
    x = 1
    yield x
    x += 1
    yield x

dis.dis(simple_gen)

Output (semplificato):

  RETURN_GENERATOR           # returns the generator-iterator
  POP_TOP
  RESUME              0

  LOAD_SMALL_INT      1      # x = 1
  STORE_FAST          0 (x)

  LOAD_FAST           0 (x)
  YIELD_VALUE         0      # ← THIS is yield: suspends and produces x
  RESUME              5      # resume point after next()/send()
  POP_TOP

  LOAD_FAST           0 (x)
  LOAD_SMALL_INT      1
  BINARY_OP           13 (+=) # x += 1
  STORE_FAST          0 (x)

  LOAD_FAST           0 (x)
  YIELD_VALUE         0      # secondo yield
  RESUME              5
  POP_TOP

  LOAD_CONST          1 (None)
  RETURN_VALUE               # fine → StopIteration

I punti chiave:

  1. RETURN_GENERATOR — all’inizio, la funzione non esegue il corpo ma restituisce un generator-iterator
  2. YIELD_VALUE — sospende l’esecuzione e restituisce il valore al chiamante
  3. RESUME — il punto esatto dove l’esecuzione riprende dopo next() o send()
  4. RETURN_VALUE alla fine — quando il generatore finisce, Python solleva StopIteration

Macchina a stati del generatore

Un generatore attraversa quattro stati:

  • GEN_CREATED — appena creato, nessun codice eseguito
  • GEN_RUNNING — in esecuzione (non puo’ essere ripreso: ValueError)
  • GEN_SUSPENDED — congelato su un yield, il frame e’ intatto
  • GEN_CLOSED — terminato, gi_frame e’ None

Le transizioni corrispondono esattamente ai metodi che abbiamo visto: next()/send() per avanzare, yield per sospendere, close() per terminare.

Protezione dalla rientranza

Un generatore non puo’ essere ripreso mentre e’ in esecuzione:

def reentrant():
    yield next(gen)  # reentrancy attempt

gen = reentrant()
try:
    next(gen)
except ValueError as e:
    print(e)  # "generator already executing"

Come dice la PEP 255: “A generator cannot be resumed while it is actively running.”


4. send(): comunicazione bidirezionale (PEP 342)

La PEP 255 dava ai generatori la capacita’ di produrre valori. Ma non potevano riceverli. La PEP 342 (Python 2.5) cambia tutto: yield diventa un’espressione (prima era solo uno statement), e il suo valore e’ determinato da chi riprende il generatore.

Da statement a espressione

Prima della PEP 342:

yield value        # statement only, no return value

Dopo la PEP 342:

received = yield value   # expression: received contains what was sent

Come dice la PEP 342: “In effect, a yield-expression is like an inverted function call; the argument to yield is in fact returned (yielded) from the currently executing function, and the return value of yield is the argument passed in via send().”

send(): semantica esatta

send(value) fa due cose:

  1. Il value diventa il risultato dell’espressione yield nel generatore
  2. Riprende l’esecuzione fino al prossimo yield
def accumulator():
    total = 0
    while True:
        value = yield total    # yield produce total, riceve value
        total += value

acc = accumulator()
next(acc)        # 0   (avvio: esegue fino al primo yield, produce total=0)
acc.send(10)     # 10  (value=10, total=10, produce 10)
acc.send(5)      # 15  (value=5, total=15, produce 15)
acc.send(100)    # 115

Perche’ il primo send() deve essere None

Quando il generatore e’ appena stato creato, non ha ancora raggiunto nessun yield. Non c’e’ un’espressione yield a cui assegnare il valore. Percio':

acc = accumulator()
try:
    acc.send(42)  # TypeError!
except TypeError as e:
    print(e)  # "can't send non-None value to a just-started generator"

next(gen) e’ equivalente a gen.send(None). Il primo send() deve essere None (oppure si usa next()).

Come dice la PEP 342: “Because generator-iterators begin execution at the top of the generator’s function body, there is no yield expression to receive a value when the generator has just been created.”

Regole di parentesizzazione di yield

Dato che yield e’ ora un’espressione, ci sono regole su quando servono le parentesi:

# Legal: yield as sole expression on right side of assignment
x = yield 42
x = yield

# Legale: con parentesi in espressioni piu' complesse
x = 12 + (yield 42)
foo((yield 42))

# ILLEGALE: senza parentesi in espressioni complesse
x = 12 + yield 42    # SyntaxError
foo(yield 42, 12)     # SyntaxError

Multitasking cooperativo

Con send(), il generatore non e’ piu’ solo un produttore. E’ una coroutine cooperativa:

yield  → punto di sospensione + produce un valore
send() → riprendi l'esecuzione + inietta un valore

La funzione decide DOVE sospendersi  (yield)
Il chiamante decide QUANDO riprenderla (next/send)

La differenza fondamentale con i thread: i thread vengono interrotti dall’esterno in momenti arbitrari — e’ preemption. Qui la funzione sceglie volontariamente quando cedere il controllo. E’ cooperazione.

Se posso avere piu’ funzioni cosi’… e alternarle manualmente… ho costruito un mini scheduler. La PEP 342 include proprio un esempio di trampoline scheduler che fa esattamente questo.


5. Lifecycle management: throw() e close() (PEP 342)

Se una coroutine puo’ restare sospesa per tempo indefinito, come la fermiamo in modo sicuro? Come gestiamo gli errori? La PEP 342 aggiunge tre cose: throw(), close(), e GeneratorExit.

throw() — iniezione di eccezioni

throw(type, value, traceback) inietta un’eccezione nel punto esatto in cui il generatore e’ sospeso, come se raise venisse eseguito su quella riga di yield.

def resilient_worker():
    while True:
        try:
            yield "working"
        except ValueError as e:
            print(f"Handled: {e}")

w = resilient_worker()
next(w)                          # "working"
w.throw(ValueError("boom"))     # prints "Handled: boom", then yields "working"
next(w)                          # "working" — continua a funzionare!

La semantica esatta (dalla PEP 342):

  • Se il generatore cattura l’eccezione e produce un altro valore, quello e’ il valore di ritorno di throw()
  • Se non la cattura, l’eccezione si propaga al chiamante
  • Se il generatore e’ gia’ chiuso, throw() solleva direttamente l’eccezione

close() — terminazione pulita

close() inietta GeneratorExit nel punto di sospensione. Il generatore deve terminare (facendo return o lasciando propagare GeneratorExit). Se fa yield dopo GeneratorExit, Python solleva RuntimeError.

def resource_holder():
    print("Setup: opening connection")
    try:
        while True:
            yield "ready"
    except GeneratorExit:
        print("Teardown: closing connection")
        # NON fare yield qui! → RuntimeError

r = resource_holder()
next(r)     # "ready", stampa "Setup: opening connection"
r.close()   # prints "Teardown: closing connection"

L’implementazione di close() dalla PEP 342 e’ essenzialmente questa:

def close(self):
    try:
        self.throw(GeneratorExit)
    except (GeneratorExit, StopIteration):
        pass
    else:
        raise RuntimeError("generator ignored GeneratorExit")

Garbage collection e __del__

La PEP 342 aggiunge un altro dettaglio cruciale: g.__del__() chiama g.close(). Questo significa che quando un generatore viene garbage-collected, il suo codice di cleanup (nei blocchi finally o except GeneratorExit) viene comunque eseguito. Non c’e’ piu’ il rischio della PEP 255 dove i finally potevano non essere eseguiti.

Perche’ questo e’ fondamentale per asyncio

Questo e’ esattamente il meccanismo con cui asyncio cancella un Task:

# Internamente, asyncio fa qualcosa come:
task._coro.throw(asyncio.CancelledError())

Quando fai task.cancel(), asyncio inietta un CancelledError nella coroutine. La coroutine puo’ catturarlo per fare cleanup, oppure lasciarlo propagare.

Una coroutine non e’ solo sospendibile. Ha un ciclo di vita controllabile dall’esterno.


6. Il problema: composizione incompleta (PEP 380)

La PEP 380 (Python 3.3) risolve un problema fondamentale: come comporre generatori senza perdere funzionalita'.

Il problema

def fermentation(days):
    for d in range(days, 0, -1):
        yield d

def brew(style):
    for v in fermentation(3):
        yield v  # ← just pass-through!
    yield f"🍺 {style} ready!"

Sembra funzionare per i valori, ma la delega e’ incompleta. Vediamo dove si rompe.

Problema 1: send() non passa attraverso il for loop

Immaginiamo una fermentation che puo’ essere interrotta con send("stop"):

def fermentation(days):
    """Fermentation, can be stopped with send('stop')"""
    for d in range(days, 0, -1):
        command = yield d
        if command == "stop":
            return d  # remaining days

def brew():
    """Brew sequence: fermentation → ready"""
    yield "Start brewing"
    for v in fermentation(3):
        yield v
    yield "🍺 Beer ready!"

g = brew()
next(g)            # → "Start brewing"
next(g)            # → 3
next(g)            # → 2
next(g)            # → 1
g.send("stop")     # Want to abort...
                   # → "🍺 Beer ready!"  ← fermentation doesn't stop!

send("stop") riprende brew() al suo yield v. Poi il for loop chiama next() su fermentation() — cioe’ fermentation.send(None). Il sub-generatore riceve None, non "stop". A questo punto la fermentation e’ gia’ esaurita (3, 2, 1), il for termina, e la birra viene servita — anche se volevamo fermarla.

Il for loop non ha modo di inoltrare il valore ricevuto da send() al sub-generatore. E’ la scelta naturale per delegare, eppure si rompe non appena serve comunicazione bidirezionale.

Vedremo nella sezione sulla soluzione come yield from risolve questo problema:

def brew():
    yield "Start brewing"
    remaining = yield from fermentation(3)
    if remaining:
        yield f"🛑 Stopped at day {remaining}"
    else:
        yield "🍺 Beer ready!"

g = brew()
next(g)            # → "Start brewing"
next(g)            # → 3
next(g)            # → 2
next(g)            # → 1
g.send("stop")     # → "🛑 Stopped at day 1" ✓

Con yield from, send("stop") arriva direttamente a fermentation(). Il sub-generatore esegue return 1, e yield from cattura il valore di ritorno in remaining. brew() puo’ ora decidere cosa fare — in questo caso mostra il messaggio di stop. Due funzionalita’ impossibili con il for loop: propagazione di send() e cattura del return value.

Problema 2: throw() non viene inoltrato

def sub_with_error_handling():
    while True:
        try:
            yield "ready"
        except ValueError as e:
            print(f"  sub caught: {e}")
            yield "recovered"

Proviamo a delegare e poi fare throw():

def manual_delegate_send():
    sub = sub_with_error_handling()
    value = next(sub)
    while True:
        sent = yield value
        value = sub.send(sent)

g = manual_delegate_send()
next(g)                          # "ready"
g.throw(ValueError("boom"))     # ValueError non catturato!

Cosa e’ successo? throw(ValueError) inietta l’eccezione nel punto di yield di manual_delegate_send — non di sub. Il delegante non ha try/except, quindi l’eccezione propaga al caller. Il sub-generatore non la vede mai, nonostante sappia gestirla.

Il wiring manuale e’ un incubo

Possiamo far funzionare throw() aggiungendo il wiring a mano:

def manual_delegate_send_throw():
    sub = sub_with_error_handling()
    value = next(sub)
    while True:
        try:
            sent = yield value
            value = sub.send(sent)
        except BaseException as e:
            try:
                value = sub.throw(e)
            except StopIteration:
                return

Funziona! Ma dovremmo anche gestire close()GeneratorExit, il valore di ritorno del sub-generatore via StopIteration.value, il caso in cui il sub non abbia throw()… Stiamo riscrivendo l’espansione completa di yield from (sezione 8) — circa 30 righe di codice fragile e pieno di corner case.

Come dice la PEP 380: “A piece of code containing a yield cannot be factored out and put into a separate function in the same way as other code.” E ancora: “handling send() and throw() becomes very complicated, and it is tricky to handle all the corner cases correctly.”

La soluzione: yield from

def brew(style):
    yield from fermentation(3)
    yield f"🍺 {style} ready!"

list(brew("Pilsner"))    # [3, 2, 1, "🍺 Pilsner ready!"]
list(brew("IPA"))        # [3, 2, 1, "🍺 IPA ready!"]

Una riga di delega. yield from non e’ zucchero sintattico per un ciclo for. E’ un tunnel bidirezionale trasparente tra il chiamante e il sub-generatore. Due stili diversi mostrano la riusabilita’ della composizione.

Il principio di refactoring (PEP 380)

La PEP 380 enuncia un principio fondamentale: “It should be possible to take a section of code containing one or more yield expressions, move it into a separate function…and call the new function using a yield from expression. The behaviour of the resulting compound generator should be…the same as the original unfactored generator in all situations, including calls to __next__(), send(), throw() and close().”

Verifichiamolo.

send() passa attraverso yield from

# fermentation() is the same as before (accepts send("stop"))

def brew():
    yield "Start brewing"
    remaining = yield from fermentation(3)
    if remaining:
        yield f"🛑 Stopped at day {remaining}"
    else:
        yield "🍺 Beer ready!"

g = brew()
next(g)            # → "Start brewing"
next(g)            # → 3  — fermentation() responds
next(g)            # → 2  — still fermentation(), brew() is transparent
next(g)            # → 1  — still fermentation()
g.send("stop")     # → "🛑 Stopped at day 1" — send() reaches fermentation() directly!

Lo stesso send("stop") che prima veniva ignorato dal for loop, ora attraversa brew() e arriva direttamente a fermentation(). brew() e’ sparita come intermediario.

throw() passa attraverso yield from

def inner_with_handling():
    while True:
        try:
            yield "ready"
        except ValueError as e:
            print(f"  inner caught: {e}")
            yield "recovered"

def outer_delegating():
    yield from inner_with_handling()

g = outer_delegating()
next(g)                              # "ready"
g.throw(ValueError("boom"))          # "recovered" — inner ha catturato!
next(g)                              # "ready" — continua

Lo stesso throw(ValueError) che prima rimbalzava sul delegante senza raggiungere il sub, ora attraversa outer_delegating() e arriva direttamente a inner_with_handling().

Verifica: close() passa attraverso yield from

def inner_with_cleanup():
    try:
        yield "working"
    except GeneratorExit:
        print("  inner: GeneratorExit received, cleaning up")

def outer_delegating():
    yield from inner_with_cleanup()

g = outer_delegating()
next(g)    # "working"
g.close()  # prints "inner: GeneratorExit received, cleaning up"

7. Il valore di ritorno: il pezzo mancante

C’e’ un aspetto di yield from che viene spesso trascurato ma che e’ fondamentale per capire await: il meccanismo del valore di ritorno.

return in un generatore

A partire da Python 3.3, un generatore puo’ fare return value. Cosa succede? Python traduce return value in raise StopIteration(value). Il valore e’ accessibile in StopIteration.value:

def sub():
    yield "working"
    return "done"

g = sub()
next(g)  # "working"
try:
    next(g)
except StopIteration as e:
    print(e.value)  # "done" ← return value is inside StopIteration!

Come dice la PEP 380: “The value of the yield from expression is the first argument to the StopIteration exception raised by the iterator when it terminates.”

yield from cattura il valore di ritorno

Ecco il pezzo cruciale: yield from cattura lo StopIteration del sub-generatore e ne estrae il .value:

def fetch_data():
    yield "step 1: loading..."
    yield "step 2: processing..."
    return 42                      # → raise StopIteration(42)

def main():
    result = yield from fetch_data()
    print(f"Risultato: {result}")  # Risultato: 42
    yield result

g = main()
next(g)   # "step 1: loading..."  — fetch_data produce
next(g)   # "step 2: processing..." — fetch_data produce
next(g)   # 42                     — fetch_data fa return, main riprende

Ecco cosa succede passo per passo:

  1. next(g)main() inizia, incontra yield from fetch_data(), delega a fetch_data()
  2. fetch_data() produce "step 1: loading..." — passato al chiamante
  3. next(g)fetch_data() produce "step 2: processing..."
  4. next(g)fetch_data() fa return 42StopIteration(42)
  5. yield from cattura StopIteration, estrae .value (42), lo assegna a result
  6. main() riprende, stampa “Risultato: 42”, poi fa yield 42

Senza questo meccanismo, await non potrebbe restituire risultati. Quando scrivi result = await coroutine(), sotto il cofano avviene esattamente questo.


8. L’espansione completa di yield from

Per capire veramente yield from, dobbiamo guardare la sua espansione completa come definita nella PEP 380. Lo statement:

RESULT = yield from EXPR

e’ semanticamente equivalente a:

_i = iter(EXPR)
try:
    _y = next(_i)                          # starts the sub-iterator
except StopIteration as _e:
    _r = _e.value                          # sub finito subito → cattura return value
else:
    while True:
        try:
            _s = yield _y                  # yield al chiamante, ricevi valore da send()
        except GeneratorExit as _e:
            try:
                _m = _i.close              # propaga close() al sub
            except AttributeError:
                pass
            else:
                _m()
            raise _e
        except BaseException as _e:
            _x = sys.exc_info()
            try:
                _m = _i.throw              # propaga throw() al sub
            except AttributeError:
                raise _e
            else:
                try:
                    _y = _m(*_x)           # sub gestisce l'eccezione → nuovo valore
                except StopIteration as _e:
                    _r = _e.value          # sub finito → cattura return value
                    break
        else:
            try:
                if _s is None:
                    _y = next(_i)          # send(None) ≡ next()
                else:
                    _y = _i.send(_s)       # inoltra send() al sub
            except StopIteration as _e:
                _r = _e.value              # sub finito → cattura return value
                break
RESULT = _r

Sono circa 30 righe di codice. Analizziamole sezione per sezione.

Inizializzazione (righe 1-5)

_i = iter(EXPR)
try:
    _y = next(_i)
except StopIteration as _e:
    _r = _e.value

Ottiene un iteratore da EXPR e lo avvia con next(). Se il sub-iteratore termina immediatamente (es. un generatore vuoto), cattura il valore di ritorno e il ciclo non parte.

Il ciclo principale (else)

Il cuore e’ un while True che:

  1. Fa yield _y per passare il valore del sub-iteratore al chiamante
  2. Riceve il risultato di yield (che e’ cio’ che il chiamante passa con send())
  3. Gestisce tre casi in base a cosa succede durante il yield

Caso 1: GeneratorExit (close)

except GeneratorExit as _e:
    try:
        _m = _i.close
    except AttributeError:
        pass
    else:
        _m()
    raise _e

Se il chiamante chiama close() sul generatore delegante, GeneratorExit viene catturato e propagato al sub-iteratore chiamando _i.close(). Poi GeneratorExit viene risollevato.

Caso 2: BaseException (throw)

except BaseException as _e:
    _x = sys.exc_info()
    try:
        _m = _i.throw
    except AttributeError:
        raise _e
    else:
        try:
            _y = _m(*_x)
        except StopIteration as _e:
            _r = _e.value
            break

Se il chiamante chiama throw(), l’eccezione viene propagata al sub-iteratore. Se il sub la gestisce e produce un nuovo valore, il ciclo continua. Se il sub termina (StopIteration), il valore di ritorno viene catturato.

Nota: se il sub-iteratore non ha il metodo throw (es. e’ un semplice iteratore, non un generatore), l’eccezione viene sollevata direttamente.

Caso 3: caso normale (send/next)

else:
    try:
        if _s is None:
            _y = next(_i)
        else:
            _y = _i.send(_s)
    except StopIteration as _e:
        _r = _e.value
        break

Se non ci sono eccezioni, il valore ricevuto dal yield viene inoltrato al sub-iteratore. send(None) viene trattato come next() per compatibilita’ con iteratori semplici che non hanno send().

Perche’ il bytecode

Il bytecode che Python genera per yield from e’ molto piu’ efficiente dell’espansione Python — usa le istruzioni specializzate GET_YIELD_FROM_ITER, SEND, e END_SEND:

import dis

def delegator():
    result = yield from sub_gen()
    return result

dis.dis(delegator)
  LOAD_GLOBAL         sub_gen
  CALL                0
  GET_YIELD_FROM_ITER            # gets the iterator
  LOAD_CONST          None
  SEND                3          # delegation loop (→ END_SEND when done)
  YIELD_VALUE         1          # yield al chiamante
  RESUME              2          # resume point
  JUMP_BACKWARD       5          # torna a SEND
  END_SEND                       # sub finito, StopIteration.value sullo stack
  STORE_FAST          result

Il ciclo SEND → YIELD_VALUE → RESUME → JUMP_BACKWARD → SEND e’ l’equivalente compilato di tutta quell’espansione Python. Molto piu’ veloce.


9. Composizione reale: brew sequence

Ora che sappiamo come funziona yield from, possiamo costruire una composizione reale. Tre fasi distinte — controllo qualita’, fermentazione, servizio — ciascuna un generatore indipendente, composte in un’unica sequenza:

def quality_check():
    for ingredient in ["malt", "hops", "water"]:
        yield f"✓ {ingredient}"

def fermentation(days):
    for d in range(days, 0, -1):
        yield d

def brew():
    yield from quality_check()
    yield from fermentation(3)
    yield "🍺 Beer ready!"

list(brew())
# → ["✓ malt", "✓ hops", "✓ water", 3, 2, 1, "🍺 Beer ready!"]

Tre generatori, tre responsabilita’. brew non sa cosa succede dentro ogni fase — delega completamente con yield from. Aggiungere una fase? Basta un altro yield from. Rimuoverne una? Basta cancellare una riga. La composizione e’ dichiarativa: descrive cosa eseguire, non come.

Recap: i mattoni di asyncio

Fermiamoci. Abbiamo tutti i pezzi:

PEP 255 (Py 2.2)  yield        → sospensione
PEP 342 (Py 2.5)  send()       → comunicazione bidirezionale
PEP 342 (Py 2.5)  throw/close  → lifecycle e cancellazione
PEP 380 (Py 3.3)  yield from   → composizione + valore di ritorno

await non nasce dal nulla. E’ l’evoluzione naturale di yield from applicato a oggetti “awaitable”.


10. Due concetti, una sintassi

Prima di passare ad async/await, fermiamoci su una confusione comune: qual e’ la differenza tra un generatore e una coroutine?

Generatore: produce valori

Un generatore e’ una funzione che produce una sequenza di valori, uno alla volta. Il yield e’ un punto di uscita — il chiamante consuma valori con next() o con for:

def fibonacci():
    a, b = 0, 1
    while True:
        yield a          # yields a value
        a, b = b, a + b

# Il chiamante CONSUMA valori
fib = fibonacci()
print(next(fib))  # 0
print(next(fib))  # 1
print(next(fib))  # 1

Il flusso e’ unidirezionale: il generatore produce, il chiamante consuma. Anche se usiamo send(), il caso d’uso principale resta la produzione lazy di sequenze.

Coroutine: consuma valori (e coopera)

Una coroutine e’ una funzione che riceve dati dall’esterno e li elabora. Il yield e’ un punto di ingresso — il chiamante inietta valori con send():

def running_average():
    total = 0.0
    count = 0
    average = None
    while True:
        value = yield average   # receives a value, yields the average
        total += value
        count += 1
        average = total / count

# Il chiamante INIETTA valori
avg = running_average()
next(avg)              # priming
print(avg.send(10))    # 10.0
print(avg.send(20))    # 15.0
print(avg.send(30))    # 20.0

Il flusso e’ bidirezionale: il chiamante inietta, la coroutine elabora e risponde.

Il problema: stessa sintassi, intenti diversi

Ecco il punto cruciale — Python usa la stessa parola chiave yield per entrambi:

# Is this a generator or a coroutine?
def ambiguous():
    while True:
        x = yield x * 2

Non c’e’ modo di saperlo guardando la firma. Devi leggere il corpo per capire l’intento. E il rischio e’ reale:

def coroutine_accumulator():
    """Coroutine: riceve valori via send()"""
    total = 0
    while True:
        value = yield total
        total += value

# Common mistake: using it as a generator
acc = coroutine_accumulator()
for x in acc:          # BUG: for usa next(), non send()
    print(x)           # prints 0, then TypeError: unsupported operand type(s) for +=
    if x > 100:
        break

for chiama next(), che equivale a send(None). La coroutine riceve None come value e fa total += NoneTypeError. Ma se il corpo fosse diverso, potrebbe funzionare silenziosamente con risultati sbagliati.

Tabella riassuntiva

Aspetto Generatore Coroutine
Scopo Produce valori (lazy) Riceve ed elabora dati
Flusso dati Unidirezionale (out) Bidirezionale (in/out)
Interfaccia next(), for send(), throw(), close()
yield e’… Punto di uscita Punto di ingresso
Esempio tipico Sequenze infinite, pipeline Accumulatori, state machine, scheduler
Sintassi def + yield def + yield (identica!)

L’ultima riga e’ il problema. Stessa sintassi, semantica completamente diversa.

La soluzione: async def (PEP 492)

Python 3.5 risolve questa ambiguita’ con una sintassi dedicata:

# Generatore — produce valori
def my_range(n):
    i = 0
    while i < n:
        yield i
        i += 1

# Coroutine — coopera con l'event loop
async def fetch_data(url):
    response = await aiohttp.get(url)    # yields control
    return await response.json()

Ora e’ impossibile confondere i due:

  • def + yield = generatore
  • async def + await = coroutine
  • Usare next() su una coroutine nativa → TypeError
  • Usare await su un generatore → TypeError
async def coro():
    await asyncio.sleep(1)

c = coro()
next(c)  # TypeError: 'coroutine' object is not an iterator
def gen():
    yield 1

async def wrong():
    await gen()  # TypeError: object generator can't be used in 'await' expression

La separazione sintattica elimina un’intera classe di bug. Ed e’ esattamente la motivazione della PEP 492: “This proposal makes coroutines a proper standalone concept in Python.”


11. Da yield from ad await (PEP 492)

La PEP 492 (Python 3.5) introduce async/await. La motivazione principale: risolvere l’ambiguita’ tra generatori e coroutine.

Come dice la PEP 492: “It is easy to confuse coroutines with regular generators, since they share the same syntax.” E ancora: “Whether or not a function is a coroutine is determined by a presence of yield or yield from statements in its body, which can lead to unobvious errors.”

Da yield from ad await

# Prima di Python 3.5
import asyncio

@asyncio.coroutine
def fetch_old():
    result = yield from asyncio.sleep(0.1)
    return result

# Dopo Python 3.5
async def fetch_new():
    result = await asyncio.sleep(0.1)
    return result

Queste due versioni sono funzionalmente identiche. La differenza e’ che await:

  1. Rende esplicito che questa e’ una coroutine, non un generatore
  2. Valida il suo argomento — accetta solo “awaitable”, non qualsiasi iterabile
  3. Abilita async for e async with, impossibili con la vecchia sintassi

Il decoratore types.coroutine

Per la transizione, Python fornisce @types.coroutine che marca un generatore come compatibile con await:

import types

@types.coroutine
def old_style_sleep():
    yield  # yields control to the event loop

async def new_style():
    await old_style_sleep()  # works thanks to @types.coroutine

Internamente, @types.coroutine imposta il flag CO_ITERABLE_COROUTINE nel code object, distinguendolo da CO_COROUTINE (usato per async def). Questo permette a await di accettare generatori decorati senza trattarli come coroutine native.

StopIterationRuntimeError

Un cambiamento importante: nelle coroutine native (async def), StopIteration che sfugge viene automaticamente convertito in RuntimeError (PEP 479). Questo previene bug sottili dove un StopIteration accidentale veniva interpretato come “fine della coroutine”.


12. Il protocollo __await__

Ecco il pezzo che connette tutto meccanicamente. Un oggetto e’ “awaitable” se:

  1. E’ una coroutine nativa (da async def)
  2. E’ un generatore decorato con @types.coroutine
  3. Ha un metodo __await__() che restituisce un iteratore

Come dice la PEP 492: “every await is suspended by a yield somewhere down the chain of await calls.”

Costruiamo un awaitable custom

class MyFuture:
    def __init__(self, value):
        self._value = value

    def __await__(self):
        yield f"suspending with future"  # questo yield arriva all'event loop
        return self._value               # → StopIteration(self._value)

async def use_custom():
    result = await MyFuture(42)
    return result

Ora guidiamolo manualmente, come farebbe un event loop:

coro = use_custom()

# The event loop starts the coroutine with send(None)
val = coro.send(None)
print(val)  # "suspending with future" — the coroutine is suspended

# The event loop resumes the coroutine
try:
    coro.send(None)
except StopIteration as e:
    print(e.value)  # 42 — the coroutine returned the result

Ecco cosa succede:

  1. coro.send(None) avvia use_custom(), che incontra await MyFuture(42)
  2. await chiama MyFuture(42).__await__(), che restituisce un iteratore (il generatore interno)
  3. L’iteratore fa yield "suspending..." — questo valore risale fino all’event loop
  4. L’event loop chiama coro.send(None) per riprendere
  5. L’iteratore __await__ fa return self._valueStopIteration(42)
  6. await cattura lo StopIteration, estrae .value, lo assegna a result
  7. use_custom() fa return resultStopIteration(42) verso l’event loop

E’ lo stesso identico meccanismo di yield from che cattura StopIteration.value. La catena e’: send()yield per sospendere → StopIteration per restituire il risultato. E’ generatori fino in fondo.

Cosa succede con await asyncio.sleep(1)

1. Event loop avvia la coroutine      →  coro.send(None)
2. Coroutine incontra await            →  chiama __await__() → iteratore
3. Iteratore fa yield di un Future     →  risale all'event loop
4. Event loop riceve il Future         →  registra un timer (1 secondo)
5. Event loop passa ad altre coroutine →  esegue send() su un'altra
6. Timer scade                         →  Future completato
7. Event loop riprende                 →  coro.send(result)
8. Coroutine riparte                   →  stesso stack frame, stesse variabili

E’ lo stesso send() che abbiamo visto nella sezione 4. Lo stesso meccanismo di sospensione e ripresa. L’event loop e’ semplicemente il “chiamante” che decide quando fare send().


13. L’event loop: da zero

Per dimostrare che non c’e’ magia, costruiamone uno in piu’ versioni crescenti di complessita'.

Versione 1: round-robin puro (15 righe)

from collections import deque

coroutines = deque()

def enqueue_coro(coro):
    coroutines.append(coro)

def run():
    while coroutines:
        coro = coroutines.popleft()
        try:
            value = next(coro)        # avanza fino al prossimo yield
            print(value)
            coroutines.append(coro)   # rimette in coda
        except StopIteration:
            pass                      # coroutine finita

Tre operazioni: mantiene una coda di coroutine, ne riprende una alla volta con next(), e se non e’ finita la rimette in coda.

def brew(name):
    for ingredient in ["malt", "hops", "water"]:
        yield f"[{name}] {ingredient}"   # cede il controllo

enqueue_coro(brew("Pilsner"))
enqueue_coro(brew("IPA"))
run()

Output:

[Pilsner] malt
[IPA] malt
[Pilsner] hops
[IPA] hops
[Pilsner] water
[IPA] water

Le coroutine cooperano: eseguono un po’, fanno yield, e l’event loop passa alla successiva. Questo e’ multitasking cooperativo.

Versione 2: con supporto timer

Il vero event loop deve gestire attese temporizzate. La coroutine deve poter dire “risvegliami tra X secondi”:

from collections import deque
import time

class MiniEventLoop:
    def __init__(self):
        self.ready = deque()       # coroutines ready to execute
        self.sleeping = []         # waiting coroutines: (wake_time, coro)

    def call_soon(self, coro):
        self.ready.append(coro)

    def call_later(self, delay, coro):
        self.sleeping.append((time.monotonic() + delay, coro))

    def run(self):
        while self.ready or self.sleeping:
            # Check if any sleeping coroutine needs to wake up
            now = time.monotonic()
            still_sleeping = []
            for wake_time, coro in self.sleeping:
                if now >= wake_time:
                    self.ready.append(coro)    # svegliata!
                else:
                    still_sleeping.append((wake_time, coro))
            self.sleeping = still_sleeping

            # If no coroutine is ready, sleep until next wakeup
            if not self.ready:
                if self.sleeping:
                    nearest = min(t for t, _ in self.sleeping)
                    time.sleep(nearest - time.monotonic())
                continue

            # Execute the next ready coroutine
            coro = self.ready.popleft()
            try:
                result = next(coro)
                if isinstance(result, tuple) and result[0] == "sleep":
                    self.call_later(result[1], coro)  # coroutine requests sleep
                else:
                    self.ready.append(coro)  # put back in queue
            except StopIteration:
                pass  # coroutine finished

Le coroutine comunicano con l’event loop tramite i valori che producono con yield:

def fermenting_batch(name, days):
    for d in range(days, 0, -1):
        print(f"  [{name}] {d} days left")
        yield ("sleep", 0.05)    # "event loop, svegliami tra 50ms"
    print(f"  [{name}] 🍺 ready")

loop = MiniEventLoop()
loop.call_soon(fermenting_batch("Pilsner", 3))
loop.call_soon(fermenting_batch("IPA", 2))
loop.run()

Output:

  [Pilsner] 3 days left
  [IPA] 2 days left
  [Pilsner] 2 days left
  [IPA] 1 days left
  [Pilsner] 1 days left
  [IPA] 🍺 ready
  [Pilsner] 🍺 ready

Le coroutine si alternano, e l’event loop gestisce i timer. Il meccanismo e’ sempre lo stesso: yield per sospendersi, next() per riprendere.

Questo e’ esattamente il pattern di asyncio, in miniatura. asyncio.sleep(1) sotto il cofano fa yield di un Future che l’event loop riprende dopo che il timer scade. Il nostro yield ("sleep", 0.05) e’ la versione primitiva della stessa idea.

Il vero asyncio

asyncio aggiunge:

  • I/O non bloccante tramite select/epoll/kqueue — il loop monitora file descriptor (socket, pipe) e riprende le coroutine quando l’I/O e’ pronto
  • Future e Task — un Task avvolge una coroutine e la guida con send(), un Future rappresenta un risultato che arrivera'
  • Callback — per interoperabilita’ con codice non-async
  • Gestione eccezioni — propagazione di errori, cancellazione di task
  • gather() — esecuzione concorrente di piu’ coroutine

Ma l’idea fondamentale e’ la stessa: un ciclo che chiama send() sulle coroutine quando i loro awaitable sono pronti.


14. asyncio.run(): la forma moderna

Lo stesso concetto del nostro mini event loop, nella forma standard:

import asyncio

async def brew(name):
    for ingredient in ["malt", "hops", "water"]:
        print(f"[{name}] {ingredient}")
        await asyncio.sleep(0)  # yields control to the event loop

async def main():
    await asyncio.gather(
        brew("Pilsner"),
        brew("IPA"),
    )

asyncio.run(main())

Output:

[Pilsner] malt
[IPA] malt
[Pilsner] hops
[IPA] hops
[Pilsner] water
[IPA] water

La corrispondenza:

             Prima                    Ora
             yield                    await
             generatore               coroutine (async def)
             next()/send()            event loop chiama send()
             scheduler manuale        asyncio.run()
             mini loop con deque      asyncio event loop
             yield ("sleep", 0.05)    await asyncio.sleep(0.05)

Il modello non e’ cambiato. E’ diventato uno standard del linguaggio.


15. Async generators (PEP 525)

La PEP 525 (Python 3.6) colma l’ultimo gap: permette di usare yield dentro async def. Ma per capire perche’ serve, dobbiamo prima vedere il problema.

Due superpoteri incompatibili

Finora abbiamo visto due superpoteri di Python:

  1. Generatori: producono valori uno alla volta, lazy, senza caricare tutto in memoria
  2. Coroutine async: fanno I/O senza bloccare, cedendo il controllo all’event loop

Ma cosa succede quando ti servono entrambi? Immaginate di controllare gli ingredienti di un lotto, uno alla volta, dove ogni controllo richiede una chiamata di rete a un fornitore:

import asyncio

async def check_ingredient(batch, offset):
    """Simula un controllo qualita' con latenza di rete"""
    await asyncio.sleep(0.01)  # latenza I/O
    ingredients = [
        {"name": "malt",  "quality": 85},
        {"name": "hops",  "quality": 92},
        {"name": "water", "quality": 78},
        {"name": "yeast", "quality": 95},
        {"name": "sugar", "quality": 88},
    ]
    return ingredients[offset] if offset < len(ingredients) else None

Come produciamo questi controlli uno alla volta al consumer?

Approccio 1: carica tutto in memoria

async def get_all_checks(batch):
    checks = []
    offset = 0
    while True:
        check = await check_ingredient(batch, offset)
        if check is None:
            break
        checks.append(check)
        offset += 1
    return checks  # TUTTO in memoria

async def main():
    all_checks = await get_all_checks("Pilsner")
    for check in all_checks:
        if check["quality"] > 90:
            print(check["name"])

Funziona, ma con un milione di ingredienti ho un milione di dict in memoria. E devo aspettare che tutti i controlli siano completati prima di processare il primo. Abbiamo perso la lazy evaluation — il superpotere dei generatori.

Approccio 2: callback (inversione del controllo)

async def process_checks(batch, callback):
    offset = 0
    while True:
        check = await check_ingredient(batch, offset)
        if check is None:
            break
        await callback(check)
        offset += 1

async def main():
    async def on_check(check):
        if check["quality"] > 90:
            print(check["name"])

    await process_checks("Pilsner", on_check)

Non spreca memoria, ma il flusso di controllo e’ invertito: non e’ piu’ il consumer a tirare i dati, e’ il producer a spingerli. Non posso fare break per fermarmi al primo match. Non posso comporre pipeline. Ho perso il modello pull dei generatori.

Il dilemma: con un generatore sincrono posso fare yield check, ma non posso fare await (il controllo richiede I/O asincrono). Con una coroutine posso fare await, ma non posso fare yield (prima della PEP 525, yield dentro async def era un SyntaxError).

Serviva un modo per avere entrambi: await per l’I/O e yield per la produzione lazy.

La soluzione: yield dentro async def (PEP 525)

async def quality_check(batch):
    offset = 0
    while True:
        check = await check_ingredient(batch, offset)
        if check is None:
            return
        yield check  # await for I/O, yield to produce lazily

Quattro righe. Combina entrambi i superpoteri:

async def main():
    async for check in quality_check("Pilsner"):
        if check["quality"] > 90:
            print(check["name"])
            break  # mi fermo! aclose() chiamato automaticamente

Il consumer tira i dati al suo ritmo (async for chiama __anext__() solo quando e’ pronto). L’I/O e’ non bloccante (await dentro il generatore). E break funziona — async for chiama aclose() automaticamente, che inietta GeneratorExit (stesso meccanismo della sezione 5).

Il problema nella pratica: la classe necessaria prima di PEP 525

Prima della PEP 525, per ottenere lo stesso risultato serviva una classe intera:

import asyncio

# Senza PEP 525: verbose e lento
class AsyncCounter:
    def __init__(self, n):
        self.n = n
        self.i = 0

    def __aiter__(self):
        return self

    async def __anext__(self):
        if self.i >= self.n:
            raise StopAsyncIteration
        await asyncio.sleep(0.01)
        self.i += 1
        return self.i

Analizziamo la classe pezzo per pezzo:

  • __init__ — gestisce lo stato manualmente (self.i, self.n). In un generatore, lo stato e’ nel frame.
  • __aiter__ — restituisce se stesso (come iter(g) is g per i generatori sincroni).
  • __anext__ — e’ il cuore: ogni chiamata deve controllare se l’iterazione e’ finita (StopAsyncIteration), fare il lavoro asincrono (await), aggiornare lo stato (self.i += 1), e restituire il valore. E’ una macchina a stati scritta a mano.

L’uso e’ identico a quello di un generatore:

async def main():
    async for val in AsyncCounter(5):
        print(val)  # 1, 2, 3, 4, 5

asyncio.run(main())

Ma per arrivare a questo risultato abbiamo dovuto scrivere una classe con tre metodi, gestire lo stato manualmente, e sollevare StopAsyncIteration al momento giusto. Immaginate un iteratore con logica piu’ complessa — ramificazioni, cleanup, gestione errori. La classe diventa rapidamente ingestibile.

Lo stesso, con async generator

# Con PEP 525: elegante e 2.3x piu' veloce
async def async_counter(n):
    for i in range(1, n + 1):
        await asyncio.sleep(0.01)
        yield i

Stessa funzionalita’. Un quarto del codice. Lo stato (i, n) vive nel frame del generatore — congelato ad ogni yield, esattamente come nella sezione 3. La terminazione e’ implicita: quando il for finisce, il generatore termina, e async for riceve StopAsyncIteration automaticamente.

L’uso dal lato consumer e’ identico:

async def main():
    async for val in async_counter(5):
        print(val)  # 1, 2, 3, 4, 5

asyncio.run(main())

Stesso async for, stesso output. Ma il producer e’ passato da 15 righe di macchina a stati esplicita a 4 righe di logica lineare. E come riporta la PEP 525: “asynchronous generators are 2.3x faster than an equivalent implemented as an asynchronous iterator” — perche’ sono primitive del linguaggio, ottimizzate nel bytecode, senza overhead di dispatch dei metodi.

Il protocollo

Un async generator implementa:

Metodo Equivalente sincrono Descrizione
__aiter__() __iter__() Restituisce se stesso
__anext__() __next__() Awaitable che produce il prossimo valore
asend(val) send(val) Awaitable che inietta un valore
athrow(exc) throw(exc) Awaitable che inietta un’eccezione
aclose() close() Awaitable che inietta GeneratorExit

Possiamo verificarlo:

async def example():
    await asyncio.sleep(0)
    yield 42

ag = example()
print(type(ag))                # <class 'async_generator'>
print(hasattr(ag, '__aiter__')) # True
print(hasattr(ag, '__anext__')) # True
print(hasattr(ag, 'asend'))    # True
print(hasattr(ag, 'athrow'))   # True
print(hasattr(ag, 'aclose'))   # True

Differenze importanti con i generatori sincroni

  1. return con valore e’ vietato: return value in un async generator e’ un SyntaxError. (Nei generatori sincroni e’ permesso dal PEP 380 per il meccanismo StopIteration.value.)

  2. yield from e’ vietato: Non si puo’ usare yield from in un async generator. Si usa async for per consumare altri async generator.

  3. StopAsyncIteration invece di StopIteration: Per segnalare la fine dell’iterazione si usa un’eccezione diversa, perche’ StopIteration nelle coroutine viene convertito in RuntimeError (PEP 479).

  4. Finalizzazione esplicita: I generatori sincroni vengono chiusi automaticamente dal garbage collector tramite __del__close(). Per gli async generator, il cleanup potrebbe richiedere await (es. chiudere una connessione DB), che non si puo’ fare in __del__. Percio’ il cleanup deve essere esplicito tramite aclose(), oppure gestito dall’event loop tramite sys.set_asyncgen_hooks().

# Pattern sicuro: async for gestisce aclose() automaticamente
async for item in async_counter(10):
    process(item)

# Pattern rischioso: se interrompi, chi chiama aclose()?
ag = async_counter(10)
await ag.__anext__()
# ... se dimentichi aclose(), risorse non rilasciate

async for sotto il cofano

Come il for sincrono, anche async for e’ zucchero sintattico:

# Questo:
async for TARGET in ITER:
    BLOCK

# Equivale a:
_iter = type(ITER).__aiter__(ITER)
while True:
    try:
        TARGET = await type(_iter).__anext__(_iter)
    except StopAsyncIteration:
        break
    BLOCK

La differenza chiave: __anext__() restituisce un awaitable, quindi il ciclo puo’ sospendersi ad ogni iterazione per fare I/O asincrono.

Il pezzo mancante: yield from per async generators

Se avete letto la sezione 6, sapete che yield from e’ fondamentale per comporre generatori sincroni: delega completamente al sub-generatore, propagando send(), throw(), close() e catturando il valore di ritorno.

Ma nelle async generator yield from non funziona. Se volete delegare a un altro async generator, dovete fare il pass-through manuale:

async def sub_generator():
    yield 1
    yield 2
    yield 3

# Quello che VORREMMO scrivere:
async def delegating_BROKEN():
    yield from sub_generator()       # SyntaxError!
    # async yield from sub_generator()  # Non esiste (ancora)

# Quello che DOBBIAMO scrivere:
async def delegating_workaround():
    async for item in sub_generator():
        yield item                   # pass-through manuale

Il problema e’ lo stesso della sezione 6 prima di yield from: il async for manuale non propaga asend(), athrow(), aclose() al sub-generatore. E’ solo un pass-through dei valori, non una delegazione completa.

async def accumulator():
    """Async generator che riceve valori via asend()"""
    total = 0
    while True:
        value = yield total
        if value is None:
            return total
        total += value

async def broken_delegation():
    """async for NON propaga asend()"""
    async for item in accumulator():
        yield item
    # Se qualcuno fa broken_delegation().asend(10),
    # il 10 arriva a broken_delegation, NON ad accumulator!

PEP 828: la soluzione in arrivo (Python 3.15)

La PEP 828 (marzo 2026, status: Draft, autore: Peter Bierma) propone di colmare esattamente questo gap, introducendo async yield from:

# PEP 828 — proposta per Python 3.15

async def sub():
    yield 1
    yield 2
    return "done"

async def delegating():
    result = async yield from sub()   # delegazione COMPLETA
    print(result)                     # "done"

async yield from farebbe per gli async generator esattamente quello che yield from fa per i generatori sincroni:

  • Propaga asend() al sub-generatore
  • Propaga athrow() e aclose()
  • Cattura il valore di ritorno via StopAsyncIteration.value

La PEP 828 abilita anche return con valore negli async generator (oggi vietato), propagato tramite StopAsyncIteration.value — lo stesso pattern di StopIteration.value per i generatori sincroni (sezione 7).

La storia si ripete: lo stesso problema di composizione risolto da yield from nel 2012 (PEP 380) viene ora risolto per il mondo async con async yield from (PEP 828). Il pattern e’ identico, la meccanica e’ la stessa.


16. Use case reali

Fin qui la meccanica. Ora tre pattern concreti.

15.1 Da brew sincrona ad asincrona

Ricordate la brew sequence della sezione 9? Ecco la conversione:

import asyncio

# Sync (section 9):
# def quality_check():
#     for ingredient in ["malt", "hops", "water"]:
#         yield f"✓ {ingredient}"

# Async — imagine each check does real I/O:
async def quality_check():
    for ingredient in ["malt", "hops", "water"]:
        await asyncio.sleep(0.1)  # simulates I/O
        yield f"✓ {ingredient}"

async def fermentation(days):
    for d in range(days, 0, -1):
        await asyncio.sleep(1)
        yield d

async def brew():
    async for status in quality_check():
        print(status)
    async for d in fermentation(3):
        print(d)
    print("🍺 Beer ready!")

asyncio.run(brew())
# ✓ malt, ✓ hops, ✓ water, 3, 2, 1, 🍺 Beer ready!

Stessa struttura, stessa logica. Le trasformazioni sono meccaniche:

  • defasync def
  • forasync for
  • yield fromasync for (non esiste async yield from — vedi sezione 15.3)

Il codice che avete gia’ scritto con i generatori diventa naturalmente async. Non dovete reimparare nulla.

15.2 Web scraping concorrente con async generators

import aiohttp
import asyncio

async def fetch_pages(urls):
    async with aiohttp.ClientSession() as session:
        for url in urls:
            async with session.get(url) as resp:
                yield await resp.text()   # yields one page at a time

async def extract_links(pages):
    async for html in pages:
        for link in parse_links(html):   # your parsing function
            yield link                    # backpressure naturale

async def main():
    urls = ["https://example.com/1", "https://example.com/2"]
    async for link in extract_links(fetch_pages(urls)):
        print(link)

Il pattern chiave e’ la backpressure naturale: il consumer (extract_links) tira i dati dal producer (fetch_pages) al suo ritmo. Nessun buffer che esplode, nessuna coda da gestire.

Come funziona? fetch_pages fa yield dopo ogni richiesta HTTP. Il async for in extract_links chiama __anext__() solo quando e’ pronto per il prossimo elemento. Se il consumer e’ lento, il producer si ferma automaticamente al yield — esattamente come un generatore sincrono.

15.3 Testing con generatori

I generatori non sono solo per I/O. Sono strumenti potenti per il testing, anche con la libreria standard unittest.

Pattern 1: setup/teardown con contextmanager + generatore

contextlib.contextmanager trasforma un generatore in un context manager. Lo yield separa setup e teardown:

import unittest
from contextlib import contextmanager

@contextmanager
def db_connection(dsn="test://localhost"):
    """Setup → yield → teardown, anche in caso di errore"""
    conn = create_connection(dsn)      # setup
    try:
        yield conn                     # test uses conn here
    finally:
        conn.rollback()                # teardown: always executed
        conn.close()

Come funziona sotto il cofano? contextmanager chiama next() sul generatore per ottenere il valore (la connessione). Quando il blocco with termina — normalmente o per eccezione — riprende il generatore. Il finally garantisce il cleanup. E’ lo stesso meccanismo di close() della PEP 342: se il generatore viene abbandonato, GeneratorExit viene iniettato e il finally si esegue.

class TestDatabase(unittest.TestCase):
    def test_query_returns_rows(self):
        with db_connection() as conn:
            rows = conn.execute("SELECT * FROM users")
            self.assertEqual(len(rows), 2)
            self.assertEqual(rows[0]["name"], "alice")

    def test_cleanup_happens_on_error(self):
        """Il rollback avviene anche se il test fallisce"""
        with self.assertRaises(RuntimeError):
            with db_connection() as conn:
                raise RuntimeError("simulated error")
        # conn.rollback() e conn.close() sono stati chiamati
        self.assertEqual(conn.state, "closed")

Pattern 2: mock di API paginate

Un generatore simula perfettamente un’API che restituisce risultati a pagine. Ogni next() e’ una “chiamata” all’API:

def fake_paginated_api(pages):
    """Ogni iterazione simula una risposta HTTP con una pagina di risultati"""
    for page in pages:
        yield {"data": page, "has_next": page != pages[-1]}

def consume_all_pages(api):
    """Funzione da testare: consuma tutte le pagine"""
    results = []
    for response in api:
        results.extend(response["data"])
        if not response["has_next"]:
            break
    return results

class TestPaginatedAPI(unittest.TestCase):
    def test_single_page(self):
        api = fake_paginated_api([["a", "b", "c"]])
        self.assertEqual(consume_all_pages(api), ["a", "b", "c"])

    def test_multiple_pages(self):
        api = fake_paginated_api([["a", "b"], ["c", "d"], ["e"]])
        self.assertEqual(consume_all_pages(api), ["a", "b", "c", "d", "e"])

    def test_empty_page(self):
        api = fake_paginated_api([[]])
        self.assertEqual(consume_all_pages(api), [])

Zero dipendenze esterne. Ogni test crea il suo generatore con i dati che gli servono. Determinismo completo.

Pattern 3: sequenza di risposte HTTP per testare retry logic

Un generatore puo’ simulare un servizio instabile — ogni next() restituisce la risposta alla prossima “richiesta”:

from types import SimpleNamespace

def fake_http_responses(*responses):
    """Ogni iterazione simula una risposta HTTP"""
    for status, body in responses:
        yield SimpleNamespace(status=status, json=lambda b=body: b)

class RetryClient:
    """Client con retry automatico — la classe da testare"""
    def __init__(self, transport):
        self.transport = transport

    def fetch(self):
        for response in self.transport:
            if response.status == 200:
                return response.json()
        raise ConnectionError("all retries failed")

class TestRetryClient(unittest.TestCase):
    def test_success_on_first_try(self):
        transport = fake_http_responses((200, {"ok": True}))
        result = RetryClient(transport).fetch()
        self.assertEqual(result, {"ok": True})

    def test_success_after_retries(self):
        transport = fake_http_responses(
            (500, "error"),        # first request: error
            (503, "unavailable"),  # second: still error
            (200, {"ok": True}),   # third: success
        )
        result = RetryClient(transport).fetch()
        self.assertEqual(result, {"ok": True})

    def test_all_retries_fail(self):
        transport = fake_http_responses(
            (500, "error"),
            (503, "unavailable"),
        )
        with self.assertRaises(ConnectionError):
            RetryClient(transport).fetch()

Il generatore ci da’ il controllo totale sulla sequenza di risposte. Possiamo simulare timeout, errori intermittenti, risposte malformate — tutto senza toccare la rete. Il test e’ veloce, deterministico, e leggibile: la sequenza di risposte e’ la specifica del test.

Gli stessi pattern con pytest

Riscriviamo gli stessi tre pattern con pytest per confrontare la leggibilita'.

Pattern 1: fixture come generatore

In pytest le fixture sono generatori — il framework gestisce yield nativamente:

import pytest

@pytest.fixture
def db_connection():
    conn = create_connection("test://localhost")  # setup
    yield conn                                     # test uses conn here
    conn.rollback()                                # teardown
    conn.close()                                   # always executed

def test_query_returns_rows(db_connection):        # ← injected automatically
    rows = db_connection.execute("SELECT * FROM users")
    assert len(rows) == 2
    assert rows[0]["name"] == "alice"

def test_connection_state_is_open(db_connection):
    assert db_connection.state == "open"

Confronto con unittest:

  • unittest: la fixture e’ un @contextmanager che il test usa esplicitamente con with
  • pytest: la fixture e’ un generatore “nudo”, il framework la inietta come argomento del test

Il meccanismo e’ lo stesso (generatore con yield), ma pytest nasconde il wiring. Il test non sa nemmeno che sta usando un generatore — riceve solo conn.

Pattern 2: mock di API paginate

def test_single_page():
    api = fake_paginated_api([["a", "b", "c"]])
    assert consume_all_pages(api) == ["a", "b", "c"]

def test_multiple_pages():
    api = fake_paginated_api([["a", "b"], ["c", "d"], ["e"]])
    assert consume_all_pages(api) == ["a", "b", "c", "d", "e"]

def test_empty_page():
    api = fake_paginated_api([[]])
    assert consume_all_pages(api) == []

Confronto:

  • unittest: self.assertEqual(consume_all_pages(api), ["a", "b", "c"])
  • pytest: assert consume_all_pages(api) == ["a", "b", "c"]

Stessa logica, meno boilerplate. Niente self, niente assertEqual — solo assert.

Pattern 3: retry logic

def test_success_after_retries():
    transport = fake_http_responses(
        (500, "error"),
        (503, "unavailable"),
        (200, {"ok": True}),
    )
    assert RetryClient(transport).fetch() == {"ok": True}

def test_all_retries_fail():
    transport = fake_http_responses((500, "error"), (503, "unavailable"))
    with pytest.raises(ConnectionError):
        RetryClient(transport).fetch()

Confronto:

  • unittest: with self.assertRaises(ConnectionError):
  • pytest: with pytest.raises(ConnectionError):

Quasi identico, ma pytest.raises offre anche accesso al messaggio di errore.

Bonus pytest: fixture parametrizzata

Pytest aggiunge una cosa che con unittest richiede molto piu’ codice — le fixture parametrizzate:

@pytest.fixture(params=["test://db1", "test://db2", "test://db3"])
def multi_db(request):
    conn = create_connection(request.param)
    yield conn
    conn.rollback()
    conn.close()

def test_connection_dsn(multi_db):
    assert multi_db.dsn.startswith("test://")

Questo genera tre test automaticamente — uno per ogni DSN. Lo stesso generatore, tre configurazioni diverse. Con unittest servirebbero tre metodi separati o un subTest.

Confronto riassuntivo

Aspetto unittest pytest
Fixture setup/teardown @contextmanager + with esplicito @pytest.fixture + yield + injection
Asserzioni self.assertEqual(a, b) assert a == b
Eccezioni attese self.assertRaises(Exc) pytest.raises(Exc)
Parametrizzazione subTest o metodi multipli @pytest.fixture(params=[...])
Boilerplate class TestX(unittest.TestCase): Funzioni libere
Dipendenze Libreria standard Dipendenza esterna
Generatori: ruolo Esplicito (with, for) Implicito (il framework li guida)

Il punto chiave: il generatore e’ lo stesso in entrambi i casi. fake_paginated_api e fake_http_responses non cambiano. Cambia solo come il framework di test interagisce con le fixture — pytest sfrutta i generatori in modo piu’ trasparente, unittest richiede piu’ wiring esplicito.


17. Riepilogo e filo narrativo

Ecco l’intero viaggio in una tabella:

PEP Python Concetto Cosa ci da' Meccanismo
255 2.2 yield Sospensione YIELD_VALUE congela il frame
342 2.5 send() Comunicazione bidirezionale yield diventa espressione
342 2.5 throw()/close() Lifecycle e cancellazione GeneratorExit, iniezione eccezioni
380 3.3 yield from Composizione + return value StopIteration.value, tunnel bidirezionale
492 3.5 async/await Sintassi dedicata, disambiguazione __await__ protocol, CO_COROUTINE flag
525 3.6 Async generators yield in async def __aiter__/__anext__/asend/athrow/aclose
Event loop Orchestrazione automatica send() guidato da I/O readiness

Il filo narrativo

2001  PEP 255  Python 2.2    yield (sospensione)
                                │
                                ▼
2005  PEP 342  Python 2.5    send() (bidirezionalita')
                              throw/close (lifecycle)
                                │
                                ▼
2012  PEP 380  Python 3.3    yield from (composizione + return value)
                              StopIteration.value → il ponte verso await
                                │
                                ▼
2015  PEP 492  Python 3.5    async/await (__await__ protocol)
                              event loop (send() automatizzato su I/O readiness)
                                │
                                ▼
2016  PEP 525  Python 3.6    async generators (yield + await insieme)
                              il cerchio si chiude

Ogni concetto e’ un gradino. Niente salti. Niente magia. 14 anni di evoluzione inevitabile.

async/await non e’ magia. E’ il risultato di 14 anni di evoluzione del modello dei generatori.

Una coroutine non e’ una funzione che gira in parallelo. E’ una funzione che decide quando fermarsi per lasciare spazio alle altre.


Basato sul talk “Dai Generatori ad Asyncio” di Carlo Bertini e Francesco Panico — PyCon Italia 2026. Riferimenti: PEP 255, PEP 342, PEP 380, PEP 479, PEP 492, PEP 525, documentazione Python 3.