diff --git a/examples/01-minimal/example.py b/examples/01-minimal/example.py index 06ddbb8f..2e11f5df 100644 --- a/examples/01-minimal/example.py +++ b/examples/01-minimal/example.py @@ -1,7 +1,83 @@ +import logging import kopf +import dataclasses + + +# @kopf.on.login() +# def delayed_k3s(**_): +# conn = kopf.login_via_pykube(logger=logging.getLogger('xxx')) +# if conn: +# return dataclasses.replace(conn, server=conn.server.rsplit(':', 1)[0] + ':11223') @kopf.on.create('kopfexamples') -def create_fn(spec, **kwargs): - print(f"And here we are! Creating: {spec}") - return {'message': 'hello world'} # will be the new status +@kopf.on.update('kopfexamples') +def create_fn(meta, spec, reason, logger, **kwargs): + rv = meta.get('resourceVersion') + logger.warning(f">>> {rv=} And here we are! {reason=}: {spec}") + + +# @kopf.on.create('kopfexamples') +# def create_fn2(spec, **kwargs): +# print(f"And here we are! Creating2: {spec}") + +""" +======================================================================================================================= +Trigger with (delete the object first!): +$ kubectl apply -f examples/obj.yaml && sleep 1 && kubectl patch -f examples/obj.yaml --type merge -p '{"spec": {"field": 2}}' +======================================================================================================================= + +The timeline of a misbehaving operator (with an artificial latency of 3 seconds): + + /-- kubectl creates an object (state a=s0) + | ... sleep 1s + | /-- kubectl patches the spec.field with the patch "p1", creates state "b"=s0+p1 + | | /-- Kopf patches with annotations (state c=s0+p1+p2) + | | | /-- Kopf patches with annotations (the same state d=s0+p1+p2+p3, d==c) + ↓ ↓ | | +----+-//-aaaaabbbbbbbcccccdddddddddddddddddd--> which state is stored in kubernetes + ↓ ↓ ↑↓ ↑↓ + | | || |\----3s----\ + | | |\---+3s----\ | + | \----3s+---\| | | + \----3s----\| || | | + ↓↑ ↓↑ ↓ ↓ +----+-//------------aaaaabbbbbbbbcccccdddddd--> which state is seen by the operator + ↓ ↓↑ ↓↑ ↓ ↓ + | || || | \-- Kopf gets the state "d"=s0+p1+p2+p3, sees the annotations, goes idle. + | || || \-- Kopf gets the state "c"=s0+p1+p2, sees the annotations, goes idle. + | || || + | || |\-- Kopf reacts, executes handlers (2ND TIME), adds annotations with a patch (p3) + | || \-- Kopf gets the state "b"=s0+p1 with NO annotations of "p2" yet. + | || !BUG!: "c"=s0+p1+p2 is not seen yet, though "c"/"p2" exists by now! + | || + | |\-- Kopf reacts, executes handlers (1ST TIME), adds annotations with a patch (p2) + | \-- Kopf gets a watch-event (state a) + \-- Kopf starts watching the resource + +A fix with consistency tracking (with an artificial latency of 3 seconds): + + /-- kubectl creates an object (state a=s0) + | ... sleep 1s + | /-- kubectl patches the spec.field with the patch "p1", creates state "b"=s0+p1 + | | /-- Kopf patches with annotations (state c=s0+p1+p2) + ↓ ↓ | +----+-//-aaaaabbbbbbbccccccccccccccccccc-> which state is stored in kubernetes + ↓ ↓ ↑↓ + | | |\----3s----\ + | \----3s+---\ | + \----3s----\| | | + ↓↑ ↓ ↓ +----+-//------------aaaaabbbbbbbbcccccc-> which state is seen by the operator + ↓ ↓↑⇶⇶⇶⇶⇶⇶⇶⇶⇶⇶⇶⇶↓ Kopf's own patch "p2" enables the consistency expectation for 5s OR version "c" + | || | | + | || | \-- Kopf gets a consistent state "c"=s0+p1+p2 as expected, thus goes idle. + | || | + | || \-- Kopf executes ONLY the low-level handlers over the state "b"=s0+p1. + | || \~~~~~~⨳ inconsistency mode: wait until a new event (then discard it) OR timeout (then process it) + | || + | |\-- Kopf reacts, executes handlers, adds annotations with a patch (p2) + | \-- Kopf gets a watch-event (state a) + \-- Kopf starts watching the resource + +""" diff --git a/kopf/_core/reactor/processing.py b/kopf/_core/reactor/processing.py index 6ee9cef4..7f5dd50f 100644 --- a/kopf/_core/reactor/processing.py +++ b/kopf/_core/reactor/processing.py @@ -308,6 +308,7 @@ async def process_resource_causes( # If the wait exceeds its time and no new consistent events arrive, then fake the consistency. # However, if a patch is accumulated by now, skip waiting and apply it instantly (by exiting). # In that case, we are guaranteed to be inconsistent, so also skip the state-dependent handlers. + unslept: Optional[float] = None consistency_is_required = changing_cause is not None consistency_is_achieved = consistency_time is None # i.e. preexisting consistency if consistency_is_required and not consistency_is_achieved and not patch and consistency_time: @@ -316,6 +317,8 @@ async def process_resource_causes( consistency_is_achieved = unslept is None # "woke up" vs. "timed out" if consistency_is_required and not consistency_is_achieved: changing_cause = None # exit to PATCHing and/or re-iterating over new events. + rv = raw_event.get('object', {}).get('metadata', {}).get('resourceVersion') + local_logger.debug(f'>>> {rv=} {consistency_is_required=} {consistency_is_achieved=} {unslept=} {changing_cause=}') # Now, the consistency is either pre-proven (by receiving or not expecting any resource version) # or implied (by exceeding the allowed consistency-waiting timeout while getting no new events). diff --git a/kopf/_core/reactor/queueing.py b/kopf/_core/reactor/queueing.py index ff9fc94d..58faa64b 100644 --- a/kopf/_core/reactor/queueing.py +++ b/kopf/_core/reactor/queueing.py @@ -172,6 +172,7 @@ def exception_handler(exc: BaseException) -> None: exception_handler=exception_handler) streams: Streams = {} + loop = asyncio.get_running_loop() try: # Either use the existing object's queue, or create a new one together with the per-object job. # "Fire-and-forget": we do not wait for the result; the job destroys itself when it is fully done. @@ -192,12 +193,21 @@ def exception_handler(exc: BaseException) -> None: if isinstance(raw_event, watching.Bookmark): continue + # TODO: REMOVE: only for debugging! + rv = raw_event.get('object', {}).get('metadata', {}).get('resourceVersion') + fld = raw_event.get('object', {}).get('spec', {}).get('field') + knd = raw_event.get('object', {}).get('kind') + nam = raw_event.get('object', {}).get('metadata', {}).get('name') + logger.debug(f'STREAM GOT {knd=} {nam=} {rv=} // {fld=} ') + # Multiplex the raw events to per-resource workers/queues. Start the new ones if needed. key: ObjectRef = (resource, get_uid(raw_event)) try: # Feed the worker, as fast as possible, no extra activities. - streams[key].pressure.set() # interrupt current sleeps, if any. - await streams[key].backlog.put(raw_event) + loop.call_later(3.0, streams[key].pressure.set) + loop.call_later(3.0, streams[key].backlog.put_nowait, raw_event) + # streams[key].pressure.set() # interrupt current sleeps, if any. + # await streams[key].backlog.put(raw_event) except KeyError: # Block the operator's readiness for individual resource's index handlers. @@ -211,8 +221,10 @@ def exception_handler(exc: BaseException) -> None: # Start the worker, and feed it initially. Starting can be moderately slow. streams[key] = Stream(backlog=asyncio.Queue(), pressure=asyncio.Event()) - streams[key].pressure.set() # interrupt current sleeps, if any. - await streams[key].backlog.put(raw_event) + # streams[key].pressure.set() # interrupt current sleeps, if any. + # await streams[key].backlog.put(raw_event) + loop.call_later(3.0, streams[key].pressure.set) + loop.call_later(3.0, streams[key].backlog.put_nowait, raw_event) await scheduler.spawn( name=f'worker for {key}', coro=worker( @@ -316,6 +328,13 @@ async def worker( if isinstance(raw_event, EOS): break # out of the worker. + # TODO: REMOVE: only for debugging! + rv = raw_event.get('object', {}).get('metadata', {}).get('resourceVersion') + fld = raw_event.get('object', {}).get('spec', {}).get('field') + knd = raw_event.get('object', {}).get('kind') + nam = raw_event.get('object', {}).get('metadata', {}).get('name') + logger.debug(f'QUEUED GOT {knd=} {nam=} {rv=} exp={expected_version!r} // {fld=} ') + # Keep track of the resource's consistency for high-level (state-dependent) handlers. # See `settings.persistence.consistency_timeout` for the explanation of consistency. if expected_version is not None and expected_version == get_version(raw_event):