File: //var/opt/nydus/ops/primordial/__pycache__/context.cpython-39.pyc
a
�,�h� � @ st d dl Z d dlmZ d dlmZmZmZ G dd� d�Ze� Zed eee ef eee ef ee d�dd��Z
dS )
� N)�contextmanager)�Any�Optional�Mappingc sl e Zd ZdZdd� Zeed�dd�Zeed�� fdd �Zed
�dd�Z eed�d
d�Z
ed
�dd�Z� ZS )�Contexta�
Context instances can be used to hold state and values during the execution life cycle of a
python application without needing to change the existing API of various interacting classes.
State/values can be updated in one part of the application and be visible in another part
of the application without requiring to pass this state thru the various methods of the call stack.
Context also supports maintenance of global and thread specific state which can be very useful
in a multi-threaded applications like Flask/Falcon based web applications or the NES Journal reader.
State stored at the global level is visible in every concurrently executed thread, while thread
specific state is isolated to the corresponding thread of execution. This can useful again in a
multi-threaded application like a web-app where each incoming request is processed by a separate
thread and things like request headers, authentication user context is thread specific and isolated
to the thread handling a particular request.
Example usage:
-- In the main thread of an application's start up code we might want to inject some global state
like so.
# In some start-up file called app.py
from primordial.context import Context
MY_APP_CTX = Context()
# Instantiate some shared object
jwt_fetcher = JWTFetcher(some_config)
MY_APP_CTX.set_global('jwt_fetcher', jwt_fetcher)
# In a thread that's handling a particular HTTP request
# handle_req.py
from app import MY_APP_CTX
MY_APP_CTX.user = User()
MY_APP_CTX.token = copy_token_from_header()
# In a third file somewhere down the line of request processing
# some_file_called_by_controller.py
from app import MY_APP_CTX
def some_func():
# All of these are magically available.
# some_func's function signature didn't require to be changed
MY_APP_CTX.jwt_fetcher.get()
MY_APP_CTX.user.name == 'jDoe'
MY_APP_CTX.token.is_valid()
c C s i | _ t�� | _d S �N)�_global� threading�local�_local)�self� r
�C/opt/nydus/tmp/pip-target-wkfpz8uv/lib/python/primordial/context.py�__init__2 s zContext.__init__)�name�returnc C s@ zt | j|�W S ty: || jv r4| j| Y S � Y n0 d S r )�getattrr �AttributeErrorr �r r r
r
r �__getattr__6 s
zContext.__getattr__)r �valuec s( |dv rt � �||�S t| j||� d S )N)r r )�super�__setattr__�setattrr �r r r �� __class__r
r r >