Source code for graft.server

import logging
import asyncio
import contextlib
from random import randrange
from types import MappingProxyType
from functools import partial, cached_property

from . import net, state, model

logger = logging.getLogger(__name__)

_MIN_TIMEOUT = 5
_MAX_TIMEOUT = 10

_MESSAGE_DISPATCHER = MappingProxyType({
        model.VoteRequest: state.State.on_election_request,
        model.VoteReply: state.State.on_election_reply,
        model.AppendEntriesRequest: state.State.on_append_entries_request,
        model.AppendEntriesReply: state.State.on_append_entries_reply,
})


[docs]class Server(state.BaseController): def __init__(self, peer_id: int): super().__init__(peer_id) self._net = net.Network(peer_id) self._state = state.State() self._machine_events = asyncio.Queue()
[docs] async def start(self): """Start the server """ await asyncio.gather( self._net.start(), self._start_timer(), self._dispatch_messages(), self._update_state(), self._hearbeat(), )
async def _start_timer(self): timer = partial(state.State.timeout, self._state, self) while timeout:= randrange(_MIN_TIMEOUT*100, _MAX_TIMEOUT*100) / 100: await asyncio.sleep(timeout) await self._add_event(timer) async def _dispatch_messages(self): while msg:= await self._net.recv(): with contextlib.suppress(KeyError): function = _MESSAGE_DISPATCHER[type(msg)] await self._add_event(partial(function, self._state, self, msg)) async def _add_event(self, func): await self._machine_events.put(func) @cached_property def peers(self): return self._net.peers
[docs] def send(self, target_peer: int, message: object): super().send(target_peer, message) self._net.send(target_peer, message)
async def _update_state(self): while method:= await self._machine_events.get(): self._machine_events.task_done() method() async def _hearbeat(self): """Leader sends empty requests regularly to prevent election timeouts""" heartbeat = partial(state.State.heartbeat, self._state, self) while await asyncio.sleep(self.peer_id/10, result=True): await self._add_event(heartbeat)
if __name__ == '__main__': """ The goal for this project is to make one server the "leader" and replicate its log on all of the other servers. It will do this by sending messages through the network and processing their replies. You will be able to append new logger entries onto the leader logger and those entries will just "magically" appear on all of the followers. The leader will be able to bring any follower up to date if its logger is missing many entries. """ # import uvloop # uvloop.install() # be fast import faulthandler faulthandler.enable() import argparse from graft import transport transport.logger.setLevel(logging.INFO) # tmp: debug too verbose for this module net.logger.setLevel(logging.INFO) # tmp: debug too verbose for this module state.logger.setLevel(logging.INFO) # tmp: debug too verbose for this module parser = argparse.ArgumentParser(description='Start server arguments.') parser.add_argument('node', type=int, help='Server node to start') parsedargs = parser.parse_args() from datetime import datetime node = parsedargs.node def _debug_log(server): size = len(server._state.log) logger.info(f"{server.peer_id}, {server._state.role} term: {server._state.term}, {size=}") if size: for i in sorted(filter(lambda x: x > 0, {1, size-1, size})): logger.debug(f"Index {i}: {server._state.log[i]}") async def test(peer_id): server = Server(peer_id) asyncio.create_task(server.start()) while await asyncio.sleep(peer_id, result=True): if server._state.role == state.Roles.LEADER: msg = datetime.now() server._state.append(server, msg) _debug_log(server) asyncio.run(test(node))