Source code for graft.state

"""
Figure 4: Server states. Followers only respond to requests
from other servers. If a follower receives no communication,
it becomes a candidate and initiates an election. A candidate
that receives votes from a majority of the full cluster becomes
the new leader. Leaders typically operate until they fail.
"""
import abc
import math
import logging
import contextlib
from enum import Enum, auto
from dataclasses import dataclass

from . import log, model

logger = logging.getLogger(__name__)


[docs]class Roles(Enum): # Followers are passive, they only respond to leaders and candidates requests. # If contacted by a client, they redirect it to the leader. FOLLOWER = auto() # When no leader is heard, followers become candidates & submit an election request. CANDIDATE = auto() # Handles all client requests & propagates the requests to replicate on followers. LEADER = auto()
[docs]class TargetIsSenderError(ValueError): """Raised by `BaseController.send` when sender is receiver"""
[docs]@dataclass(frozen=True) class BaseController(abc.ABC): """A controller needs to be frozen and provide a `send` method.""" peer_id: int @property @abc.abstractmethod def peers(self) -> set: pass
[docs] @abc.abstractmethod def send(self, target_peer: int, message: object): if target_peer == self.peer_id: msg = f"Can't send {message=} to ourselves: {self.peer_id}, {target_peer=}" raise TargetIsSenderError(msg)
[docs]class State: def __init__(self): # persistent self.term = 0 # increased monotonically after every election finishes # volatile self.commit_index = 0 # index of highest log entry known to be committed, increased monotonically self.votes_received_from = set() self.log = log.new() self.become_follower()
[docs] def become_follower(self): """Respond to RPCs from candidates and leaders""" self.voted_for = 0 # reset vote anytime we become followers self.role = Roles.FOLLOWER self.heard_leader = False self.votes_received_from.clear() logger.debug("Became follower")
[docs] def become_leader(self, control: BaseController): self.role = Roles.LEADER logger.debug(f"Becoming leader from: {control.peer_id}") # Reinitialized after election # for each server, index of the next log entry to send to that server # (initialized to leader last log index + 1) self.next_index = {p: len(self.log) + 1 for p in control.peers} # for each server, index of highest log entry known to be replicated on server # (initialized to 0, increases monotonically) self.match_index = {p: 0 for p in control.peers} self.votes_received_from.clear() self.heartbeat(control)
[docs] def heartbeat(self, control): """Upon election: send initial empty AppendEntries RPCs (heartbeat) to each server; repeat during idle periods to prevent election timeouts (§5.2) :param control: :return: """ self._leader_append_entries(control)
[docs] def append(self, control, item): """If command received from client: append entry to local log, respond after entry applied to state machine (§5.3) :param control: :param item: :return: """ entry = model.Entry(self.term, item) self._leader_append_entries(control, entry)
def _leader_append_entries(self, control, *entries): if self.role != Roles.LEADER: return after_index = len(self.log) after_term = self.log[after_index].term if after_index else self.term self.log = log.append(self.log, model.Index(after_term, after_index), *entries) for follower in control.peers: self._leader_append_entries_on_follower(follower, control) def _leader_append_entries_on_follower(self, peer_id: int, control: BaseController): if self.role != Roles.LEADER: return size = len(self.log) self.commit_index = size next_follower_index = self.next_index[peer_id] # If last log index ≥ nextIndex for a follower: send # AppendEntries RPC with log entries starting at nextIndex if size and size >= next_follower_index: start_range = next_follower_index or 1 new_entries = (self.log[i] for i in range(start_range, size+1)) else: new_entries = tuple() after_i = max(next_follower_index - 1, 0) if size else 0 after_t = self.log[after_i].term if after_i else self.term message = model.AppendEntriesRequest( term=self.term, # leader’s term sender=control.peer_id, # so follower can redirect clients after=model.Index(after_t, after_i), entries=tuple(new_entries), leader_commit=self.commit_index, ) control.send(peer_id, message) def _append(self, after, entries): with contextlib.suppress(log.AppendError): self.log = log.append(self.log, after, *entries) return True return False
[docs] def timeout(self, control: BaseController): """If a follower receives no communication, it becomes a candidate and initiates an election. On conversion to candidate, start election: • Increment currentTerm • Vote for self • Reset election timer • Send RequestVote RPCs to all other servers • If votes received from majority of servers: become leader • If AppendEntries RPC received from new leader: convert to follower • If election timeout elapses: start new election """ logger.debug(f"Handling timeout") if self.role == Roles.LEADER: return if not self.heard_leader: logger.warning(f"Didnt hear from leader. Calling an election") self.role = Roles.CANDIDATE self.term += 1 # Increment currentTerm candidate = control.peer_id self.voted_for = candidate # Vote for self # Reset election timer self.votes_received_from = {candidate} # Set of votes received # index of highest log entry applied to state machine last_applied = len(self.log) last_applied_term = self.log[last_applied].term if self.log else 0 msg = model.VoteRequest( sender=candidate, term=self.term, last_log_index=model.Index(last_applied_term, last_applied), ) logger.debug(f"Requesting vote {msg}") # Send RequestVote RPCs to all other servers for peer in control.peers: control.send(peer, msg) self.heard_leader = False
@property def _last_log_term(self): return self.log[len(self.log)].term if self.log else 0 def _majority(self, control): if (peersize := len(control.peers)) % 2 == 0: return abs(peersize / 2 + 1) else: return math.ceil(peersize / 2) def _handle_base_message(self, message): """Rules for all Servers - If commitIndex > lastApplied: increment lastApplied, apply log[lastApplied] to state machine (§5.3) - If RPC request or response contains term T > currentTerm: set currentTerm = T, convert to follower (§5.1) """ if message.term > self.term: self.term = message.term logger.debug("Becoming follower from _handle_base_message") self.become_follower()
[docs] def on_election_request(self, control: BaseController, msg: model.VoteRequest): """May only vote for one candidate in a given term (subsequent requests denied) - Wont vote if candidate's logger is not as up-to-date as oneself - Grant vote if last entry in candidate's logger has a greater term than myself. """ self._handle_base_message(msg) # Make sure we've only voted once (or already voted for the sender) can_vote_for_sender = not self.voted_for or self.voted_for == msg.sender updated_log_recvd = True if self.log: # see if log from sender is equal or more up to date than ours logterm_ge_recvd = msg.last_log_index.term >= self._last_log_term log_ge_recvd = msg.last_log_index.key >= len(self.log) updated_log_recvd = logterm_ge_recvd and log_ge_recvd granted = can_vote_for_sender and updated_log_recvd and self.term <= msg.term if granted: self.voted_for = msg.sender response = model.VoteReply( sender=control.peer_id, term=self.term, granted=granted, ) for peer in control.peers: control.send(peer, response)
[docs] def on_election_reply(self, control: BaseController, msg: model.VoteReply): """A candidate that receives votes from a majority of the full cluster becomes the new leader 1. Reply false if term < currentTerm (§5.1) 2. If votedFor is null or candidateId, and candidate’s log is at least as up-to-date as receiver’s log, grant vote (§5.2, §5.4) • If votes received from majority of servers: become leader • If AppendEntries RPC received from new leader: convert to follower """ self._handle_base_message(msg) if self.role == Roles.CANDIDATE and msg.granted: self.votes_received_from.add(msg.sender) if len(self.votes_received_from) >= self._majority(control): self.become_leader(control) self.votes_received_from.clear()
[docs] def on_append_entries_request(self, control: BaseController, msg: model.AppendEntriesRequest): """ 1. Reply false if term < currentTerm (§5.1) 2. Reply false if log doesn’t contain an entry at prevLogIndex whose term matches prevLogTerm (§5.3) 3. If an existing entry conflicts with a new one (same index but different terms), delete the existing entry and all that follow it (§5.3) 4. Append any new entries not already in the log 5. If leaderCommit > commitIndex, set commitIndex = min(leaderCommit, index of last new entry) """ self.heard_leader = True self._handle_base_message(msg) if self.term <= msg.term: # If message term is greater or equal as ours, recognize sender as leader if self.role == Roles.CANDIDATE: self.become_follower() success = self._append(msg.after, msg.entries) else: # Our term is stronger than the message one. We must be more up to date. success = False response = model.AppendEntriesReply( sender=control.peer_id, term=self.term, success=success, match_index=min(msg.leader_commit, len(self.log)), ) control.send(msg.sender, response)
[docs] def on_append_entries_reply(self, control: BaseController, msg: model.AppendEntriesReply): self._handle_base_message(msg) if self.role != Roles.LEADER: return # If successful: update nextIndex and matchIndex for follower if msg.success: self.next_index[msg.sender] = msg.match_index + 1 else: # backtrack until we have a match logger.error(f"Append entries failed for {msg.sender} at index {self.next_index[msg.sender]}: {msg}") max_next_index = max(self.next_index[msg.sender] - 1, 0) self.next_index[msg.sender] = min(max_next_index, msg.match_index) logger.error(f"Back tracking and retrying now at {self.next_index[msg.sender]}") self._leader_append_entries_on_follower(msg.sender, control)