Skip to content

Code reference

API

Internal

UserCodeException

Bases: Exception

Custom Exception for user-defined catched errors.

Parameters:

Name Type Description Default
t str

Traceback returned by the worker.

required
Source code in gibbs/hub.py
18
19
20
21
22
23
24
25
26
class UserCodeException(Exception):
    """Custom Exception for user-defined catched errors.

    Args:
        t (str): Traceback returned by the worker.
    """

    def __init__(self, t: str):
        super().__init__(f"Exception raised in user-defined code. Traceback :\n{t}")

WorkerManager

A helper class that takes care of managing workers. Workers' address can be registered as available, and this class will make sure to return address of workers that are available and alive.

A worker is considered as dead if we didn't receive any heartbeat within a given interval.

Parameters:

Name Type Description Default
heartbeat_interval float

Interval of time (in seconds) after which we consider a worker to be dead.

required
Source code in gibbs/hub.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
class WorkerManager:
    """A helper class that takes care of managing workers.
    Workers' address can be registered as available, and this class will make
    sure to return address of workers that are available and alive.

    A worker is considered as dead if we didn't receive any heartbeat within a
    given interval.

    Args:
        heartbeat_interval (float): Interval of time (in seconds) after which we
            consider a worker to be dead.
    """

    def __init__(self, heartbeat_interval: float):
        super().__init__()

        self.heartbeat_t = heartbeat_interval

        self.w_ts = {}
        self.w_access = asyncio.Condition()

    async def reckon(self, address: str):
        """Register the given address as available.

        Args:
            address (str): Address of the worker to register as available.
        """
        async with self.w_access:
            self.w_ts[address] = time.time()
            self.w_access.notify()

    async def get_next_worker(self) -> str:
        """Retrieve the next available and alive worker's address.

        Returns:
            str: Address of the available and alive worker.
        """
        async with self.w_access:
            # Iterate workers until we find one that was alive recently
            w_alive = False
            while not w_alive:
                # If no workers are available, wait...
                if not self.w_ts:
                    await self.w_access.wait()

                address, ts = self.w_ts.popitem()
                w_alive = time.time() - ts < self.heartbeat_t

            return address

reckon(address) async

Register the given address as available.

Parameters:

Name Type Description Default
address str

Address of the worker to register as available.

required
Source code in gibbs/hub.py
50
51
52
53
54
55
56
57
58
async def reckon(self, address: str):
    """Register the given address as available.

    Args:
        address (str): Address of the worker to register as available.
    """
    async with self.w_access:
        self.w_ts[address] = time.time()
        self.w_access.notify()

get_next_worker() async

Retrieve the next available and alive worker's address.

Returns:

Name Type Description
str str

Address of the available and alive worker.

Source code in gibbs/hub.py
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
async def get_next_worker(self) -> str:
    """Retrieve the next available and alive worker's address.

    Returns:
        str: Address of the available and alive worker.
    """
    async with self.w_access:
        # Iterate workers until we find one that was alive recently
        w_alive = False
        while not w_alive:
            # If no workers are available, wait...
            if not self.w_ts:
                await self.w_access.wait()

            address, ts = self.w_ts.popitem()
            w_alive = time.time() - ts < self.heartbeat_t

        return address

RequestManager

A helper class that takes care of storing responses and waiting for the right response.

Parameters:

Name Type Description Default
resp_buffer_size int

Maximum size of the response buffer.

required
Source code in gibbs/hub.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
class RequestManager:
    """A helper class that takes care of storing responses and waiting for the
    right response.

    Args:
        resp_buffer_size (int): Maximum size of the response buffer.
    """

    def __init__(self, resp_buffer_size: int):
        super().__init__()

        self.resp_buffer_size = resp_buffer_size

        self.responses = {}
        self.req_states = {}

    def pin(self, req_id: str):
        """Pin a request ID. This is a necessary step when sending a request,
        so that the request can be awaited until a response is received.
        This method should be called for each `req_id` before calling `wait_for`.

        Args:
            req_id (str): Request unique identifier.
        """
        self.req_states[req_id] = asyncio.Event()

        # Ensure we don't store too many requests
        if len(self.req_states) > self.resp_buffer_size:
            # If it's the case, forget the oldest one
            k = list(self.req_states.keys())[0]
            logger.warning(f"Response buffer overflow (>{self.resp_buffer_size}). Forgetting oldest request : {k}")
            self.req_states.pop(k)
            self.responses.pop(k, None)

    async def wait_for(self, req_id: str) -> Tuple[int, Any]:
        """Async method that waits until we received the response corresponding
        to the given request ID.

        The method `pin` should be called before waiting with this method.

        Args:
            req_id (str): Request unique identifier.

        Raises:
            KeyError: Exception raised if the request wasn't registered previously.

        Returns:
            Tuple[int, Any]: Code and content of the received response.
        """
        if req_id not in self.req_states:
            raise KeyError(f"Request #{req_id} was not pinned, or was removed because of buffer overflow")

        # Wait for the receiving loop to receive the response
        await self.req_states[req_id].wait()

        # Once we get it, access the result
        r = self.responses.pop(req_id)

        # Don't forget to remove the event
        self.req_states.pop(req_id)

        return r.code, r.content

    def store(self, req_id: str, code: int, response: Any):
        """Store a response, to be consumed later.

        Args:
            req_id (str): Request unique identifier.
            code (int): Code of the response.
            response (Any): Content of the response.
        """
        # Store the response if the req_id is recognized
        if req_id in self.req_states:
            self.responses[req_id] = Response(code, response)

            # Notify that we received the response
            self.req_states[req_id].set()
        else:
            logger.warning(
                f"Request #{req_id} was previously removed from response buffer. "
                f"Ignoring the response from this request..."
            )

pin(req_id)

Pin a request ID. This is a necessary step when sending a request, so that the request can be awaited until a response is received. This method should be called for each req_id before calling wait_for.

Parameters:

Name Type Description Default
req_id str

Request unique identifier.

required
Source code in gibbs/hub.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
def pin(self, req_id: str):
    """Pin a request ID. This is a necessary step when sending a request,
    so that the request can be awaited until a response is received.
    This method should be called for each `req_id` before calling `wait_for`.

    Args:
        req_id (str): Request unique identifier.
    """
    self.req_states[req_id] = asyncio.Event()

    # Ensure we don't store too many requests
    if len(self.req_states) > self.resp_buffer_size:
        # If it's the case, forget the oldest one
        k = list(self.req_states.keys())[0]
        logger.warning(f"Response buffer overflow (>{self.resp_buffer_size}). Forgetting oldest request : {k}")
        self.req_states.pop(k)
        self.responses.pop(k, None)

wait_for(req_id) async

Async method that waits until we received the response corresponding to the given request ID.

The method pin should be called before waiting with this method.

Parameters:

Name Type Description Default
req_id str

Request unique identifier.

required

Raises:

Type Description
KeyError

Exception raised if the request wasn't registered previously.

Returns:

Type Description
Tuple[int, Any]

Tuple[int, Any]: Code and content of the received response.

Source code in gibbs/hub.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
async def wait_for(self, req_id: str) -> Tuple[int, Any]:
    """Async method that waits until we received the response corresponding
    to the given request ID.

    The method `pin` should be called before waiting with this method.

    Args:
        req_id (str): Request unique identifier.

    Raises:
        KeyError: Exception raised if the request wasn't registered previously.

    Returns:
        Tuple[int, Any]: Code and content of the received response.
    """
    if req_id not in self.req_states:
        raise KeyError(f"Request #{req_id} was not pinned, or was removed because of buffer overflow")

    # Wait for the receiving loop to receive the response
    await self.req_states[req_id].wait()

    # Once we get it, access the result
    r = self.responses.pop(req_id)

    # Don't forget to remove the event
    self.req_states.pop(req_id)

    return r.code, r.content

store(req_id, code, response)

Store a response, to be consumed later.

Parameters:

Name Type Description Default
req_id str

Request unique identifier.

required
code int

Code of the response.

required
response Any

Content of the response.

required
Source code in gibbs/hub.py
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
def store(self, req_id: str, code: int, response: Any):
    """Store a response, to be consumed later.

    Args:
        req_id (str): Request unique identifier.
        code (int): Code of the response.
        response (Any): Content of the response.
    """
    # Store the response if the req_id is recognized
    if req_id in self.req_states:
        self.responses[req_id] = Response(code, response)

        # Notify that we received the response
        self.req_states[req_id].set()
    else:
        logger.warning(
            f"Request #{req_id} was previously removed from response buffer. "
            f"Ignoring the response from this request..."
        )

Constants

RESPONSE_BUFFER_SIZE: int = 4096 module-attribute

DEFAULT_PORT: int = 5019 module-attribute

DEFAULT_HEARTBEAT_INTERVAL: float = 1 module-attribute

DEFAULT_RESET_AFTER_N_MISS: int = 2 module-attribute

MS: int = 1000 module-attribute

CODE_SUCCESS: int = 0 module-attribute

CODE_FAILURE: int = 1 module-attribute

PING: bytes = b'' module-attribute

PONG: bytes = b'' module-attribute