From f42da6e7e20ca7dbf613ca229b3349f339f2a79d Mon Sep 17 00:00:00 2001 From: Nicolas Frisby Date: Wed, 14 Feb 2024 13:53:59 -0800 Subject: [PATCH 1/7] cabal.project: temporary source-repository-package stanzas --- cabal.project | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/cabal.project b/cabal.project index c4f84c2dd7..0c79b0312f 100644 --- a/cabal.project +++ b/cabal.project @@ -31,3 +31,23 @@ tests: true benchmarks: true import: ./asserts.cabal + +source-repository-package + type: git + location: https://github.com/input-output-hk/ouroboros-network/ + tag: 8ab444850b9d654110cd751a17c21db814bba593 + subdir: monoidal-synchronisation + network-mux + ouroboros-network + ouroboros-network-api + ouroboros-network-framework + ouroboros-network-protocols + ouroboros-network-testing + --sha256: 1vw8r8csa6lq4330bwgddxa4nl2m1pw1ps5y6l7lw2zw3xdlr0hf + +source-repository-package + type: git + location: https://github.com/input-output-hk/io-sim + tag: 85d633d6f9c76ffb678b36396c8fd39b2ff7219e + subdir: io-sim + --sha256: 10f54xg88wq2w7ylg3n33f1yiswd6w4dam7fvl12raiqj0rk21qm From bbc5e144fcfb8e7e2caf128878748338942effbb Mon Sep 17 00:00:00 2001 From: Nicolas Frisby Date: Mon, 5 Feb 2024 13:39:33 -0800 Subject: [PATCH 2/7] consensus-diffusion: ledger peers interface integration --- .../Ouroboros/Consensus/Node.hs | 7 +++++-- .../Ouroboros/Consensus/NodeKernel.hs | 12 ++++++++++++ 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node.hs b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node.hs index b5a36e5f2e..fddb704e8d 100644 --- a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node.hs +++ b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node.hs @@ -550,8 +550,11 @@ runWith RunNodeArgs{..} encAddrNtN decAddrNtN LowLevelRunNodeArgs{..} = | (version, blockVersion) <- Map.toList llrnNodeToClientVersions ], Diffusion.daLedgerPeersCtx = - LedgerPeersConsensusInterface - (getPeersFromCurrentLedgerAfterSlot kernel) + LedgerPeersConsensusInterface { + lpGetLatestSlot = getImmTipSlot kernel, + lpGetLedgerPeers = fromMaybe [] <$> getPeersFromCurrentLedger kernel (const True), + lpGetLedgerStateJudgement = getLedgerStateJudgement kernel + } } localRethrowPolicy :: RethrowPolicy diff --git a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/NodeKernel.hs b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/NodeKernel.hs index e256b1f5c1..b63b7bd6c5 100644 --- a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/NodeKernel.hs +++ b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/NodeKernel.hs @@ -18,6 +18,7 @@ module Ouroboros.Consensus.NodeKernel ( , NodeKernel (..) , NodeKernelArgs (..) , TraceForgeEvent (..) + , getImmTipSlot , getMempoolReader , getMempoolWriter , getPeersFromCurrentLedger @@ -662,3 +663,14 @@ getPeersFromCurrentLedgerAfterSlot kernel slotNo = case ledgerTipSlot st of Origin -> False NotOrigin tip -> tip > slotNo + +-- | Retrieve the slot of the immutable tip +getImmTipSlot :: + ( IOLike m + , UpdateLedger blk + ) + => NodeKernel m addrNTN addrNTC blk + -> STM m (WithOrigin SlotNo) +getImmTipSlot kernel = + getTipSlot + <$> ChainDB.getImmutableLedger (getChainDB kernel) From d0e10cc0cfed1b19ee2ec2aa7b59ca769f800105 Mon Sep 17 00:00:00 2001 From: Nicolas Frisby Date: Wed, 14 Feb 2024 13:25:38 -0800 Subject: [PATCH 3/7] consensus: the bootstrap GSM --- .../ouroboros-consensus-diffusion.cabal | 2 + .../Ouroboros/Consensus/Network/NodeToNode.hs | 5 +- .../Ouroboros/Consensus/Node.hs | 106 +++- .../Ouroboros/Consensus/Node/GSM.hs | 494 ++++++++++++++++++ .../Ouroboros/Consensus/Node/Tracers.hs | 6 + .../Ouroboros/Consensus/NodeKernel.hs | 125 ++++- .../Test/ThreadNet/Network.hs | 16 +- .../Consensus/PeerSimulator/BlockFetch.hs | 3 +- .../Test/Consensus/PeerSimulator/Run.hs | 4 +- .../bench/ChainSync-client-bench/Main.hs | 2 + .../MiniProtocol/ChainSync/Client.hs | 63 ++- .../Consensus/Storage/ChainDB/Impl/Args.hs | 14 +- .../Test/Util/ChainDB.hs | 4 + .../MiniProtocol/BlockFetch/Client.hs | 3 +- .../MiniProtocol/ChainSync/Client.hs | 13 +- 15 files changed, 790 insertions(+), 70 deletions(-) create mode 100644 ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node/GSM.hs diff --git a/ouroboros-consensus-diffusion/ouroboros-consensus-diffusion.cabal b/ouroboros-consensus-diffusion/ouroboros-consensus-diffusion.cabal index 522d8f2bf8..7277d6b2fa 100644 --- a/ouroboros-consensus-diffusion/ouroboros-consensus-diffusion.cabal +++ b/ouroboros-consensus-diffusion/ouroboros-consensus-diffusion.cabal @@ -56,6 +56,7 @@ library Ouroboros.Consensus.Node.ErrorPolicy Ouroboros.Consensus.Node.Exit Ouroboros.Consensus.Node.ExitPolicy + Ouroboros.Consensus.Node.GSM Ouroboros.Consensus.Node.Recovery Ouroboros.Consensus.Node.RethrowPolicy Ouroboros.Consensus.Node.Tracers @@ -69,6 +70,7 @@ library build-depends: , base >=4.14 && <4.20 , bytestring >=0.10 && <0.13 + , cardano-slotting , cborg ^>=0.2.2 , containers >=0.5 && <0.7 , contra-tracer diff --git a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Network/NodeToNode.hs b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Network/NodeToNode.hs index 96fc64198f..a57fe3a963 100644 --- a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Network/NodeToNode.hs +++ b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Network/NodeToNode.hs @@ -569,8 +569,9 @@ mkApps kernel Tracers {..} mkCodecs ByteLimits {..} genChainSyncTimeout ReportPe (contramap (TraceLabelPeer them) (Node.chainSyncClientTracer (getTracers kernel))) (CsClient.defaultChainDbView (getChainDB kernel)) (getNodeCandidates kernel) + (getNodeIdlers kernel) them - version $ \varCandidate -> do + version $ \varCandidate (startIdling, stopIdling) -> do chainSyncTimeout <- genChainSyncTimeout (r, trailing) <- runPipelinedPeerWithLimits @@ -588,6 +589,8 @@ mkApps kernel Tracers {..} mkCodecs ByteLimits {..} genChainSyncTimeout ReportPe , CsClient.controlMessageSTM , CsClient.headerMetricsTracer = TraceLabelPeer them `contramap` reportHeader , CsClient.varCandidate + , CsClient.startIdling + , CsClient.stopIdling } return (ChainSyncInitiatorResult r, trailing) diff --git a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node.hs b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node.hs index fddb704e8d..c7f1c7a29c 100644 --- a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node.hs +++ b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node.hs @@ -21,6 +21,7 @@ module Ouroboros.Consensus.Node ( , StdRunNodeArgs (..) , stdBfcSaltIO , stdChainSyncTimeout + , stdGsmAntiThunderingHerdIO , stdKeepAliveRngIO , stdLowLevelRunNodeArgsIO , stdMkChainDbHasFS @@ -66,6 +67,7 @@ import Data.Hashable (Hashable) import Data.Map.Strict (Map) import qualified Data.Map.Strict as Map import Data.Maybe (fromMaybe, isNothing) +import Data.Time (NominalDiffTime) import Data.Typeable (Typeable) import Ouroboros.Consensus.Block import Ouroboros.Consensus.BlockchainTime hiding (getSystemStart) @@ -82,6 +84,8 @@ import Ouroboros.Consensus.Node.DbLock import Ouroboros.Consensus.Node.DbMarker import Ouroboros.Consensus.Node.ErrorPolicy import Ouroboros.Consensus.Node.ExitPolicy +import Ouroboros.Consensus.Node.GSM (GsmNodeKernelArgs (..)) +import qualified Ouroboros.Consensus.Node.GSM as GSM import Ouroboros.Consensus.Node.InitStorage import Ouroboros.Consensus.Node.NetworkProtocolVersion import Ouroboros.Consensus.Node.ProtocolInfo @@ -222,6 +226,9 @@ data LowLevelRunNodeArgs m addrNTN addrNTC versionDataNTN versionDataNTC blk -- | Ie 'bfcSalt' , llrnBfcSalt :: Int + -- | Ie 'gsmAntiThunderingHerd' + , llrnGsmAntiThunderingHerd :: StdGen + -- | Ie 'keepAliveRng' , llrnKeepAliveRng :: StdGen @@ -255,6 +262,10 @@ data LowLevelRunNodeArgs m addrNTN addrNTC versionDataNTN versionDataNTC blk -- | node-to-client protocol versions to run. , llrnNodeToClientVersions :: Map NodeToClientVersion (BlockNodeToClientVersion blk) + -- | If the volatile tip is older than this, then the node will exit the + -- @CaughtUp@ state. + , llrnMaxCaughtUpAge :: NominalDiffTime + -- | Maximum clock skew , llrnMaxClockSkew :: ClockSkew } @@ -365,8 +376,15 @@ runWith RunNodeArgs{..} encAddrNtN decAddrNtN LowLevelRunNodeArgs{..} = , ChainDB.cdbVolatileDbValidation = ValidateAll } - chainDB <- openChainDB registry inFuture cfg initLedger - llrnChainDbArgsDefaults customiseChainDbArgs' + let finalChainDbArgs = + mkFinalChainDbArgs + registry + inFuture + cfg + initLedger + llrnChainDbArgsDefaults + customiseChainDbArgs' + chainDB <- ChainDB.openDB finalChainDbArgs continueWithCleanChainDB chainDB $ do btime <- @@ -385,17 +403,29 @@ runWith RunNodeArgs{..} encAddrNtN decAddrNtN LowLevelRunNodeArgs{..} = , hfbtMaxClockRewind = secondsToNominalDiffTime 20 } - nodeKernelArgs <- - fmap (nodeKernelArgsEnforceInvariants . llrnCustomiseNodeKernelArgs) $ - mkNodeKernelArgs - registry - llrnBfcSalt - llrnKeepAliveRng - cfg - rnTraceConsensus - btime - (InFutureCheck.realHeaderInFutureCheck llrnMaxClockSkew systemTime) - chainDB + nodeKernelArgs <- do + durationUntilTooOld <- GSM.realDurationUntilTooOld + (configLedger cfg) + (ledgerState <$> ChainDB.getCurrentLedger chainDB) + llrnMaxCaughtUpAge + systemTime + let gsmMarkerFileView = + case ChainDB.cdbHasFSGsmDB finalChainDbArgs of + SomeHasFS x -> GSM.realMarkerFileView chainDB x + fmap (nodeKernelArgsEnforceInvariants . llrnCustomiseNodeKernelArgs) + $ mkNodeKernelArgs + registry + llrnBfcSalt + llrnGsmAntiThunderingHerd + llrnKeepAliveRng + cfg + rnTraceConsensus + btime + (InFutureCheck.realHeaderInFutureCheck llrnMaxClockSkew systemTime) + chainDB + llrnMaxCaughtUpAge + (Just durationUntilTooOld) + gsmMarkerFileView nodeKernel <- initNodeKernel nodeKernelArgs rnNodeKernelHook registry nodeKernel @@ -602,13 +632,25 @@ openChainDB :: -- ^ Customise the 'ChainDbArgs' -> m (ChainDB m blk) openChainDB registry inFuture cfg initLedger defArgs customiseArgs = - ChainDB.openDB args - where - args :: ChainDbArgs Identity m blk - args = customiseArgs $ - mkChainDbArgs registry inFuture cfg initLedger - (nodeImmutableDbChunkInfo (configStorage cfg)) - defArgs + ChainDB.openDB + $ mkFinalChainDbArgs registry inFuture cfg initLedger defArgs customiseArgs + +mkFinalChainDbArgs + :: forall m blk. (RunNode blk, IOLike m) + => ResourceRegistry m + -> CheckInFuture m blk + -> TopLevelConfig blk + -> ExtLedgerState blk + -- ^ Initial ledger + -> ChainDbArgs Defaults m blk + -> (ChainDbArgs Identity m blk -> ChainDbArgs Identity m blk) + -- ^ Customise the 'ChainDbArgs' + -> ChainDbArgs Identity m blk +mkFinalChainDbArgs registry inFuture cfg initLedger defArgs customiseArgs = + customiseArgs $ + mkChainDbArgs registry inFuture cfg initLedger + (nodeImmutableDbChunkInfo (configStorage cfg)) + defArgs mkChainDbArgs :: forall m blk. (RunNode blk, IOLike m) @@ -642,21 +684,29 @@ mkNodeKernelArgs :: => ResourceRegistry m -> Int -> StdGen + -> StdGen -> TopLevelConfig blk -> Tracers m (ConnectionId addrNTN) (ConnectionId addrNTC) blk -> BlockchainTime m -> InFutureCheck.SomeHeaderInFutureCheck m blk -> ChainDB m blk + -> NominalDiffTime + -> Maybe (GSM.WrapDurationUntilTooOld m blk) + -> GSM.MarkerFileView m -> m (NodeKernelArgs m addrNTN (ConnectionId addrNTC) blk) mkNodeKernelArgs registry bfcSalt + gsmAntiThunderingHerd keepAliveRng cfg tracers btime chainSyncFutureCheck chainDB + maxCaughtUpAge + gsmDurationUntilTooOld + gsmMarkerFileView = do return NodeKernelArgs { tracers @@ -671,6 +721,12 @@ mkNodeKernelArgs , miniProtocolParameters = defaultMiniProtocolParameters , blockFetchConfiguration = defaultBlockFetchConfiguration , keepAliveRng + , gsmArgs = GsmNodeKernelArgs { + gsmAntiThunderingHerd + , gsmDurationUntilTooOld + , gsmMarkerFileView + , gsmMinCaughtUpDuration = maxCaughtUpAge + } } where defaultBlockFetchConfiguration :: BlockFetchConfiguration @@ -722,6 +778,9 @@ stdMkChainDbHasFS rootPath (ChainDB.RelativeMountPoint relPath) = stdBfcSaltIO :: IO Int stdBfcSaltIO = randomIO +stdGsmAntiThunderingHerdIO :: IO StdGen +stdGsmAntiThunderingHerdIO = newStdGen + stdKeepAliveRngIO :: IO StdGen stdKeepAliveRngIO = newStdGen @@ -839,12 +898,14 @@ stdLowLevelRunNodeArgsIO RunNodeArgs{ rnProtocolInfo , rnPeerSharing } StdRunNodeArgs{..} = do - llrnBfcSalt <- stdBfcSaltIO - llrnKeepAliveRng <- stdKeepAliveRngIO + llrnBfcSalt <- stdBfcSaltIO + llrnGsmAntiThunderingHerd <- stdGsmAntiThunderingHerdIO + llrnKeepAliveRng <- stdKeepAliveRngIO pure LowLevelRunNodeArgs { llrnBfcSalt , llrnChainSyncTimeout = fromMaybe stdChainSyncTimeout srnChainSyncTimeout , llrnCustomiseHardForkBlockchainTimeArgs = id + , llrnGsmAntiThunderingHerd , llrnKeepAliveRng , llrnChainDbArgsDefaults = updateChainDbDefaults $ ChainDB.defaultArgs mkHasFS @@ -881,6 +942,7 @@ stdLowLevelRunNodeArgsIO RunNodeArgs{ rnProtocolInfo (supportedNodeToClientVersions (Proxy @blk)) , llrnWithCheckedDB = stdWithCheckedDB (Proxy @blk) srnDatabasePath networkMagic + , llrnMaxCaughtUpAge = secondsToNominalDiffTime $ 20 * 60 -- 20 min , llrnMaxClockSkew = InFuture.defaultClockSkew } diff --git a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node/GSM.hs b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node/GSM.hs new file mode 100644 index 0000000000..d8a088efd8 --- /dev/null +++ b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node/GSM.hs @@ -0,0 +1,494 @@ +{-# LANGUAGE BangPatterns #-} +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE Rank2Types #-} +{-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE ViewPatterns #-} + +-- | The Genesis State Machine decides whether the node is caught-up or not. +module Ouroboros.Consensus.Node.GSM ( + CandidateVersusSelection (..) + , DurationFromNow (..) + , GsmEntryPoints (..) + , GsmNodeKernelArgs (..) + , GsmView (..) + , MarkerFileView (..) + , WrapDurationUntilTooOld (..) + -- * Auxiliaries + , TraceGsmEvent (..) + , initializationLedgerJudgement + -- * Constructors + , realDurationUntilTooOld + , realGsmEntryPoints + , realMarkerFileView + ) where + +import qualified Cardano.Slotting.Slot as Slot +import qualified Control.Concurrent.Class.MonadSTM.TVar as LazySTM +import Control.Monad (forever, join, unless) +import Control.Monad.Class.MonadSTM (MonadSTM, STM, atomically, check) +import Control.Monad.Class.MonadThrow (MonadThrow) +import Control.Monad.Class.MonadTimer (threadDelay) +import qualified Control.Monad.Class.MonadTimer.SI as SI +import Control.Tracer (Tracer, traceWith) +import Data.Functor ((<&>)) +import qualified Data.Map.Strict as Map +import qualified Data.Set as Set +import Data.Time (NominalDiffTime) +import qualified Ouroboros.Consensus.BlockchainTime.WallClock.Types as Clock +import qualified Ouroboros.Consensus.HardFork.Abstract as HardFork +import qualified Ouroboros.Consensus.HardFork.History as HardFork +import qualified Ouroboros.Consensus.HardFork.History.Qry as Qry +import qualified Ouroboros.Consensus.Ledger.Basics as L +import Ouroboros.Consensus.Storage.ChainDB.API (ChainDB) +import Ouroboros.Consensus.Util.NormalForm.StrictTVar (StrictTVar) +import qualified Ouroboros.Consensus.Util.NormalForm.StrictTVar as StrictSTM +import Ouroboros.Network.PeerSelection.LedgerPeers.Type + (LedgerStateJudgement (..)) +import System.FS.API (HasFS, createDirectoryIfMissing, doesFileExist, + removeFile, withFile) +import System.FS.API.Types (AllowExisting (..), FsPath, OpenMode (..), + mkFsPath) +import System.Random (StdGen, uniformR) + +{------------------------------------------------------------------------------- + Interface +-------------------------------------------------------------------------------} + +data DurationFromNow = + After !NominalDiffTime + -- ^ INVARIANT positive + | + Already + -- ^ This value represents all non-positive durations, ie events from the + -- past + deriving (Eq, Show) + +data CandidateVersusSelection = + CandidateDoesNotIntersect + -- ^ The GSM assumes that this is ephemeral + -- + -- For example, the ChainSync client will either disconnect from the peer + -- or update the candidate to one that is not stale. It's also technically + -- possible that the selection is stale, which the ChainDB would also + -- resolve as soon as possible. + | + WhetherCandidateIsBetter !Bool + -- ^ Whether the candidate is better than the selection + deriving (Eq, Show) + +data GsmView m upstreamPeer selection candidate = GsmView { + antiThunderingHerd :: Maybe StdGen + -- ^ An initial seed used to randomly increase 'minCaughtUpDuration' by up + -- to 15% every transition from OnlyBootstrap to CaughtUp, in order to + -- avoid a thundering herd phenemenon. + -- + -- 'Nothing' should only be used for testing. + , + candidateOverSelection :: + selection -> candidate -> CandidateVersusSelection + , + durationUntilTooOld :: Maybe (selection -> m DurationFromNow) + -- ^ How long from now until the selection will be so old that the node + -- should exit the @CaughtUp@ state + -- + -- 'Nothing' means the selection can never become too old. + , + equivalent :: selection -> selection -> Bool + -- ^ Whether the two selections are equivalent for the purpose of the + -- Genesis State Machine + , + getChainSyncCandidates :: + STM m (Map.Map upstreamPeer (StrictTVar m candidate)) + -- ^ The latest candidates from the upstream ChainSync peers + , + getChainSyncIdlers :: STM m (Set.Set upstreamPeer) + -- ^ The ChainSync peers whose latest message claimed that they have no + -- subsequent headers + , + getCurrentSelection :: STM m selection + -- ^ The node's current selection + , + minCaughtUpDuration :: NominalDiffTime + -- ^ How long the node must stay in CaughtUp after transitioning to it from + -- OnlyBootstrap, regardless of the selection's age. This prevents the + -- whole network from thrashing between CaughtUp and OnlyBootstrap if + -- there's an outage in block production. + -- + -- See also 'antiThunderingHerd'. + , + setCaughtUpPersistentMark :: Bool -> m () + -- ^ EG touch/delete the marker file on disk + , + writeLedgerStateJudgement :: LedgerStateJudgement -> m () + -- ^ EG update the TVar that the Diffusion Layer monitors + } + +-- | The two proper GSM states for boot strap peers +-- +-- See the @BootstrapPeersIER.md@ document for their specification. +-- +-- See 'initializationLedgerJudgement' for the @Initializing@ pseudo-state. +data GsmEntryPoints m = GsmEntryPoints { + enterCaughtUp :: forall neverTerminates. m neverTerminates + -- ^ ASSUMPTION the marker file is present on disk, a la + -- @'setCaughtUpPersistentMark' True@ + -- + -- Thus this can be invoked at node start up after determining the marker + -- file is present (and the tip is still not stale) + , + enterOnlyBootstrap :: forall neverTerminates. m neverTerminates + -- ^ ASSUMPTION the marker file is absent on disk, a la + -- @'setCaughtUpPersistentMark' False@ + -- + -- Thus this can be invoked at node start up after determining the marker + -- file is absent. + } + +----- + +-- | Determine the initial 'LedgerStateJudgment' +-- +-- Also initializes the persistent marker file. +initializationLedgerJudgement :: + ( L.GetTip (L.LedgerState blk) + , Monad m + ) + => m (L.LedgerState blk) + -> Maybe (WrapDurationUntilTooOld m blk) + -- ^ 'Nothing' if @blk@ has no age limit + -> MarkerFileView m + -> m LedgerStateJudgement +initializationLedgerJudgement + getCurrentLedger + mbDurationUntilTooOld + markerFileView + = do + wasCaughtUp <- hasMarkerFile markerFileView + if not wasCaughtUp then pure TooOld else do + case mbDurationUntilTooOld of + Nothing -> return YoungEnough + Just wd -> do + sno <- L.getTipSlot <$> getCurrentLedger + getDurationUntilTooOld wd sno >>= \case + After{} -> return YoungEnough + Already -> do + removeMarkerFile markerFileView + return TooOld + +{------------------------------------------------------------------------------- + A real implementation +-------------------------------------------------------------------------------} + +-- | The actual GSM logic for boot strap peers +-- +-- See the @BootstrapPeersIER.md@ document for the specification of this logic. +realGsmEntryPoints :: forall m upstreamPeer selection tracedSelection candidate. + ( SI.MonadDelay m + , SI.MonadTimer m + , Eq upstreamPeer + ) + => (selection -> tracedSelection, Tracer m (TraceGsmEvent tracedSelection)) + -> GsmView m upstreamPeer selection candidate + -> GsmEntryPoints m +realGsmEntryPoints tracerArgs gsmView = GsmEntryPoints { + enterCaughtUp + , + enterOnlyBootstrap + } + where + (cnvSelection, tracer) = tracerArgs + + GsmView { + antiThunderingHerd + , + candidateOverSelection + , + durationUntilTooOld + , + equivalent + , + getChainSyncCandidates + , + getChainSyncIdlers + , + getCurrentSelection + , + minCaughtUpDuration + , + setCaughtUpPersistentMark + , + writeLedgerStateJudgement + } = gsmView + + enterCaughtUp :: forall neverTerminates. m neverTerminates + enterCaughtUp = enterCaughtUp' antiThunderingHerd + + enterOnlyBootstrap :: forall neverTerminates. m neverTerminates + enterOnlyBootstrap = enterOnlyBootstrap' antiThunderingHerd + + enterCaughtUp' :: forall neverTerminates. Maybe StdGen -> m neverTerminates + enterCaughtUp' g = do + (g', ev) <- blockWhileCaughtUp g + + setCaughtUpPersistentMark False + writeLedgerStateJudgement TooOld + traceWith tracer ev + + enterOnlyBootstrap' g' + + enterOnlyBootstrap' :: Maybe StdGen -> forall neverTerminates. m neverTerminates + enterOnlyBootstrap' g = do + ev <- blockUntilCaughtUp + + writeLedgerStateJudgement YoungEnough + setCaughtUpPersistentMark True + traceWith tracer ev + + -- When transitioning from OnlyBootstrap to CaughtUp, the node will + -- remain in CaughtUp for at least 'minCaughtUpDuration', regardless of + -- the selection's age. + SI.threadDelay $ realToFrac minCaughtUpDuration + + enterCaughtUp' g + + blockWhileCaughtUp :: + Maybe StdGen + -> m (Maybe StdGen, TraceGsmEvent tracedSelection) + blockWhileCaughtUp g = do + -- Randomly add up to 5min. + -- + -- Under the ideal circumstances, nodes have perfectly synchronized + -- clocks. However, if there's a block production outage, that means + -- /all/ nodes will switch back to the bootstrap peers + -- /simultaneously/, incurring a thundering herd of requests on that + -- relatively small population. This random change will spread that + -- load out. + -- + -- TODO should the Diffusion Layer do this? IE the node /promptly/ + -- switches to OnlyBootstrap, but then the Diffusion Layer introces a + -- delay before reaching out to the bootstrap peers? + let (bonus, g') = case g of + Nothing -> (0, Nothing) -- it's disabled in some tests + Just x -> + let (seconds, !g'') = + uniformR (0, 300 :: Int) x + in + (fromIntegral seconds, Just g'') + + ev <- atomically getCurrentSelection >>= blockWhileCaughtUpHelper bonus + + pure (g', ev) + + blockWhileCaughtUpHelper :: + SI.DiffTime + -> selection + -> m (TraceGsmEvent tracedSelection) + blockWhileCaughtUpHelper bonus selection = do + let tracedSelection = cnvSelection selection + + computeDuration :: m (Maybe DurationFromNow) + computeDuration = mapM ($ selection) durationUntilTooOld + computeDuration >>= \case + Nothing -> forever $ threadDelay maxBound + Just Already -> do -- it's already too old + pure $ GsmEventLeaveCaughtUp tracedSelection Already + Just (After dur) -> do + varTimeoutExpired <- SI.registerDelay (realToFrac dur + bonus) + + -- If the selection changes before the timeout expires, loop to + -- setup a new timeout for the new tip. + -- + -- Otherwise the timeout expired before the selection changed + -- (or they both happened after the previous attempt of this + -- STM transaction), so the node is no longer in @CaughtUp@. + join $ atomically $ do + expired <- LazySTM.readTVar varTimeoutExpired + let ev = GsmEventLeaveCaughtUp tracedSelection (After dur) + if expired then pure (pure ev) else do + selection' <- getCurrentSelection + check $ not $ equivalent selection selection' + pure $ blockWhileCaughtUpHelper bonus selection' + + blockUntilCaughtUp :: m (TraceGsmEvent tracedSelection) + blockUntilCaughtUp = atomically $ do + -- STAGE 1: all ChainSync clients report no subsequent headers + idlers <- getChainSyncIdlers + varsCandidate <- getChainSyncCandidates + check $ + 0 < Map.size varsCandidate + && Set.size idlers == Map.size varsCandidate + && idlers == Map.keysSet varsCandidate + + -- STAGE 2: no candidate is better than the node's current + -- selection + -- + -- For the Bootstrap State Machine, it's fine to completely ignore + -- block diffusion pipelining here, because all bootstrap peers will + -- /promptly/ rollback the tentative header if its block body turns out + -- to be invalid (aka /trap header/). Thus the node will stay in + -- CaughtUp slighty longer, until the system is no longer pipelining a + -- block; general Praos reasoning ensures that won't take particularly + -- long. + selection <- getCurrentSelection + candidates <- traverse StrictSTM.readTVar varsCandidate + let ok candidate = + WhetherCandidateIsBetter False + == candidateOverSelection selection candidate + check $ all ok candidates + + pure $ GsmEventEnterCaughtUp + (Set.size idlers) + (cnvSelection selection) + + -- STAGE 3: the previous stages weren't so slow that the idler + -- set/candidate set/individual candidates changed + -- + -- At this point, the STM scheduler will automatically retry this + -- transaction if and only if any of the TVars are no longer + -- pointer-equal to what was read above. That outcome is unlikely as + -- long as there are not a huge number of peers; as Simon Marlow wrote, + -- "Never read an unbounded number of TVars in a single transaction + -- because the O(n) performance of readTVar then gives O(n*n) for the + -- whole transaction." + -- + -- (NSF: I peeked at ghc/rts/STM.c today. The thing being counted by + -- the O(n*n) notation in the quote above is iterations of a C for loop + -- that reads a C array. The transaction log is a linked list of + -- chunks, each a 16 element array. So the 4 node kernel tvars + one + -- tvar for each of the first 12 peers fill up the first chunk, and + -- then there's a new chunk for each group of 16 peers beyond that. For + -- example, 44 peers would exactly fill 3 chunks. Thus, each readTVar + -- pages in at most 4 VM pages for the number of peers we're + -- anticipating. And then the STM validation at the end touches them + -- all one last time. Summary: seems likely to be fast enough.) + +data TraceGsmEvent selection = + GsmEventEnterCaughtUp !Int !selection + -- ^ how many peers and the current selection + | + GsmEventLeaveCaughtUp !selection !DurationFromNow + -- ^ the current selection and its age + deriving (Eq, Show) + +{------------------------------------------------------------------------------- + A helper for constructing a real 'GsmView' +-------------------------------------------------------------------------------} + +newtype WrapDurationUntilTooOld m blk = DurationUntilTooOld { + getDurationUntilTooOld :: Slot.WithOrigin Slot.SlotNo -> m DurationFromNow + } + +-- | The real system's 'durationUntilTooOld' +realDurationUntilTooOld :: + ( HardFork.HasHardForkHistory blk + , MonadSTM m + ) + => L.LedgerConfig blk + -> STM m (L.LedgerState blk) + -> NominalDiffTime + -- ^ If the volatile tip is older than this, then the node will exit the + -- @CaughtUp@ state. + -- + -- Eg 'Ouroboros.Consensus.Node.llrnMaxCaughtUpAge' + -- + -- WARNING This function returns 'Already' if the wall clock is beyond the + -- current ledger state's translation horizon; that may be confusing if an + -- unexpectedly large 'NominalDiffTime' is given here (eg 1 one week). + -> Clock.SystemTime m + -> m (WrapDurationUntilTooOld m blk) +realDurationUntilTooOld lcfg getLedgerState maxCaughtUpAge systemTime = do + runner <- + HardFork.runWithCachedSummary + $ HardFork.hardForkSummary lcfg <$> getLedgerState + pure $ DurationUntilTooOld $ \woSlot -> do + now <- Clock.systemTimeCurrent systemTime + case woSlot of + Slot.Origin -> pure $ toDur now $ Clock.RelativeTime 0 + Slot.At slot -> do + let qry = Qry.slotToWallclock slot + atomically $ HardFork.cachedRunQuery runner qry <&> \case + Left Qry.PastHorizon{} -> Already + Right (onset, _slotLen) -> toDur now onset + where + toDur + (Clock.RelativeTime now) + (Clock.getRelativeTime -> (+ maxCaughtUpAge) -> limit) + = if limit <= now then Already else After (limit - now) + +{------------------------------------------------------------------------------- + A helper for constructing a real 'GsmView' + + TODO should these operations properly be part of the ChainDB? +-------------------------------------------------------------------------------} + +-- | A view on the GSM's /Caught-Up persistent marker/ file +-- +-- These comments constrain the result of 'realMarkerFile'; mock views in +-- testing are free to be different. +data MarkerFileView m = MarkerFileView { + hasMarkerFile :: m Bool + , + -- | Remove the marker file + -- + -- Will throw an 'FsResourceDoesNotExist' error when it does not exist. + removeMarkerFile :: m () + , + -- | Create the marker file + -- + -- Idempotent. + touchMarkerFile :: m () + } + +-- | The real system's 'MarkerFileView' +-- +-- The strict 'ChainDB' argument is unused, but its existence ensures there's +-- only one process using this file system. +realMarkerFileView :: + MonadThrow m + => ChainDB m blk + -> HasFS m h + -- ^ should be independent of other filesystems, eg @gsm/@ + -> MarkerFileView m +realMarkerFileView !_cdb hasFS = + MarkerFileView { + hasMarkerFile + , + removeMarkerFile = removeFile hasFS markerFile + , + touchMarkerFile = do + createDirectoryIfMissing hasFS True (mkFsPath []) + alreadyExists <- hasMarkerFile + unless alreadyExists $ + withFile hasFS markerFile (WriteMode MustBeNew) $ \_h -> + return () + } + where + hasMarkerFile = doesFileExist hasFS markerFile + +-- | The path to the GSM's /Caught-Up persistent marker/ inside its dedicated +-- 'HasFS' +-- +-- If the file is present on node initialization, then the node was in the +-- @CaughtUp@ state when it shut down. +markerFile :: FsPath +markerFile = mkFsPath ["CaughtUpMarker"] + +{------------------------------------------------------------------------------- + A helper for the NodeKernel +-------------------------------------------------------------------------------} + +-- | Arguments the NodeKernel has to take because of the GSM +data GsmNodeKernelArgs m blk = GsmNodeKernelArgs { + gsmAntiThunderingHerd :: StdGen + -- ^ See 'antiThunderingHerd' + , + gsmDurationUntilTooOld :: Maybe (WrapDurationUntilTooOld m blk) + -- ^ See 'durationUntilTooOld' + , + gsmMarkerFileView :: MarkerFileView m + , + gsmMinCaughtUpDuration :: NominalDiffTime + -- ^ See 'minCaughtUpDuration' + } diff --git a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node/Tracers.hs b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node/Tracers.hs index a7f4f9ad54..bfe2a77849 100644 --- a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node/Tracers.hs +++ b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node/Tracers.hs @@ -34,6 +34,8 @@ import Ouroboros.Consensus.MiniProtocol.ChainSync.Server (TraceChainSyncServerEvent) import Ouroboros.Consensus.MiniProtocol.LocalTxSubmission.Server (TraceLocalTxSubmissionServerEvent (..)) +import Ouroboros.Consensus.Node.GSM (TraceGsmEvent) +import Ouroboros.Network.Block (Tip) import Ouroboros.Network.BlockFetch (FetchDecision, TraceFetchClientState, TraceLabelPeer) import Ouroboros.Network.KeepAlive (TraceKeepAliveClient) @@ -62,6 +64,7 @@ data Tracers' remotePeer localPeer blk f = Tracers , forgeStateInfoTracer :: f (TraceLabelCreds (ForgeStateInfo blk)) , keepAliveClientTracer :: f (TraceKeepAliveClient remotePeer) , consensusErrorTracer :: f SomeException + , gsmTracer :: f (TraceGsmEvent (Tip blk)) } instance (forall a. Semigroup (f a)) @@ -82,6 +85,7 @@ instance (forall a. Semigroup (f a)) , forgeStateInfoTracer = f forgeStateInfoTracer , keepAliveClientTracer = f keepAliveClientTracer , consensusErrorTracer = f consensusErrorTracer + , gsmTracer = f gsmTracer } where f :: forall a. Semigroup a @@ -110,6 +114,7 @@ nullTracers = Tracers , forgeStateInfoTracer = nullTracer , keepAliveClientTracer = nullTracer , consensusErrorTracer = nullTracer + , gsmTracer = nullTracer } showTracers :: ( Show blk @@ -141,6 +146,7 @@ showTracers tr = Tracers , forgeStateInfoTracer = showTracing tr , keepAliveClientTracer = showTracing tr , consensusErrorTracer = showTracing tr + , gsmTracer = showTracing tr } {------------------------------------------------------------------------------- diff --git a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/NodeKernel.hs b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/NodeKernel.hs index b63b7bd6c5..db2502fcd8 100644 --- a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/NodeKernel.hs +++ b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/NodeKernel.hs @@ -31,16 +31,20 @@ module Ouroboros.Consensus.NodeKernel ( import qualified Control.Concurrent.Class.MonadSTM as LazySTM import Control.DeepSeq (force) import Control.Monad +import qualified Control.Monad.Class.MonadTimer.SI as SI import Control.Monad.Except import Control.Tracer import Data.Bifunctor (second) import Data.Data (Typeable) import Data.Foldable (traverse_) +import Data.Function (on) +import Data.Functor ((<&>)) import Data.Hashable (Hashable) import Data.List.NonEmpty (NonEmpty) import Data.Map.Strict (Map) import Data.Maybe (isJust, mapMaybe) import Data.Proxy +import Data.Set (Set) import qualified Data.Text as Text import Data.Void (Void) import Ouroboros.Consensus.Block hiding (blockMatchesHeader) @@ -58,6 +62,8 @@ import Ouroboros.Consensus.Mempool import qualified Ouroboros.Consensus.MiniProtocol.BlockFetch.ClientInterface as BlockFetchClientInterface import Ouroboros.Consensus.MiniProtocol.ChainSync.Client.InFutureCheck (SomeHeaderInFutureCheck) +import Ouroboros.Consensus.Node.GSM (GsmNodeKernelArgs (..)) +import qualified Ouroboros.Consensus.Node.GSM as GSM import Ouroboros.Consensus.Node.Run import Ouroboros.Consensus.Node.Tracers import Ouroboros.Consensus.Protocol.Abstract @@ -67,6 +73,8 @@ import qualified Ouroboros.Consensus.Storage.ChainDB.API as ChainDB import qualified Ouroboros.Consensus.Storage.ChainDB.API.Types.InvalidBlockPunishment as InvalidBlockPunishment import Ouroboros.Consensus.Storage.ChainDB.Init (InitChainDB) import qualified Ouroboros.Consensus.Storage.ChainDB.Init as InitChainDB +import Ouroboros.Consensus.Util.AnchoredFragment + (preferAnchoredCandidate) import Ouroboros.Consensus.Util.EarlyExit import Ouroboros.Consensus.Util.IOLike import Ouroboros.Consensus.Util.Orphans () @@ -75,9 +83,12 @@ import Ouroboros.Consensus.Util.STM import Ouroboros.Network.AnchoredFragment (AnchoredFragment, AnchoredSeq (..)) import qualified Ouroboros.Network.AnchoredFragment as AF +import Ouroboros.Network.Block (castTip, tipFromHeader) import Ouroboros.Network.BlockFetch import Ouroboros.Network.NodeToNode (ConnectionId, MiniProtocolParameters (..)) +import Ouroboros.Network.PeerSelection.LedgerPeers.Type + (LedgerStateJudgement (..)) import Ouroboros.Network.PeerSharing (PeerSharingRegistry, newPeerSharingRegistry) import Ouroboros.Network.TxSubmission.Inbound @@ -95,36 +106,44 @@ import System.Random (StdGen) -- | Interface against running relay node data NodeKernel m addrNTN addrNTC blk = NodeKernel { -- | The 'ChainDB' of the node - getChainDB :: ChainDB m blk + getChainDB :: ChainDB m blk -- | The node's mempool - , getMempool :: Mempool m blk + , getMempool :: Mempool m blk -- | The node's top-level static configuration - , getTopLevelConfig :: TopLevelConfig blk + , getTopLevelConfig :: TopLevelConfig blk -- | The fetch client registry, used for the block fetch clients. - , getFetchClientRegistry :: FetchClientRegistry (ConnectionId addrNTN) (Header blk) blk m + , getFetchClientRegistry :: FetchClientRegistry (ConnectionId addrNTN) (Header blk) blk m -- | The fetch mode, used by diffusion. -- - , getFetchMode :: STM m FetchMode + , getFetchMode :: STM m FetchMode + + -- | The ledger judgement, used by diffusion. + -- + , getLedgerStateJudgement :: STM m LedgerStateJudgement -- | Read the current candidates - , getNodeCandidates :: StrictTVar m (Map (ConnectionId addrNTN) (StrictTVar m (AnchoredFragment (Header blk)))) + , getNodeCandidates :: StrictTVar m (Map (ConnectionId addrNTN) (StrictTVar m (AnchoredFragment (Header blk)))) + + -- | Read the set of peers that have claimed to have no subsequent + -- headers beyond their current candidate + , getNodeIdlers :: StrictTVar m (Set (ConnectionId addrNTN)) -- | Read the current peer sharing registry, used for interacting with -- the PeerSharing protocol - , getPeerSharingRegistry :: PeerSharingRegistry addrNTN m + , getPeerSharingRegistry :: PeerSharingRegistry addrNTN m -- | The node's tracers - , getTracers :: Tracers m (ConnectionId addrNTN) addrNTC blk + , getTracers :: Tracers m (ConnectionId addrNTN) addrNTC blk -- | Set block forging -- -- When set with the empty list '[]' block forging will be disabled. -- - , setBlockForging :: [BlockForging m blk] -> m () + , setBlockForging :: [BlockForging m blk] -> m () } -- | Arguments required when initializing a node @@ -141,11 +160,13 @@ data NodeKernelArgs m addrNTN addrNTC blk = NodeKernelArgs { , miniProtocolParameters :: MiniProtocolParameters , blockFetchConfiguration :: BlockFetchConfiguration , keepAliveRng :: StdGen + , gsmArgs :: GsmNodeKernelArgs m blk } initNodeKernel :: forall m addrNTN addrNTC blk. ( IOLike m + , SI.MonadTimer m , RunNode blk , Ord addrNTN , Hashable addrNTN @@ -156,19 +177,67 @@ initNodeKernel :: initNodeKernel args@NodeKernelArgs { registry, cfg, tracers , chainDB, initChainDB , blockFetchConfiguration + , gsmArgs } = do -- using a lazy 'TVar', 'BlockForging' does not have a 'NoThunks' instance. blockForgingVar :: LazySTM.TMVar m [BlockForging m blk] <- LazySTM.newTMVarIO [] initChainDB (configStorage cfg) (InitChainDB.fromFull chainDB) st <- initInternalState args + let IS + { blockFetchInterface + , fetchClientRegistry + , mempool + , peerSharingRegistry + , varCandidates + , varIdlers + , varLedgerJudgement + } = st + + do let GsmNodeKernelArgs {..} = gsmArgs + gsmTracerArgs = + ( castTip . either AF.anchorToTip tipFromHeader . AF.head . fst + , gsmTracer tracers + ) + + let gsm = GSM.realGsmEntryPoints gsmTracerArgs GSM.GsmView + { GSM.antiThunderingHerd = Just gsmAntiThunderingHerd + , GSM.candidateOverSelection = \(headers, _lst) candidate -> + case AF.intersectionPoint headers candidate of + Nothing -> GSM.CandidateDoesNotIntersect + Just{} -> + GSM.WhetherCandidateIsBetter + $ -- precondition requires intersection + preferAnchoredCandidate + (configBlock cfg) + headers + candidate + , GSM.durationUntilTooOld = + gsmDurationUntilTooOld + <&> \wd (_headers, lst) -> + GSM.getDurationUntilTooOld wd (getTipSlot lst) + , GSM.equivalent = (==) `on` (AF.headPoint . fst) + , GSM.getChainSyncCandidates = readTVar varCandidates + , GSM.getChainSyncIdlers = readTVar varIdlers + , GSM.getCurrentSelection = do + headers <- ChainDB.getCurrentChain chainDB + extLedgerState <- ChainDB.getCurrentLedger chainDB + return (headers, ledgerState extLedgerState) + , GSM.minCaughtUpDuration = gsmMinCaughtUpDuration + , GSM.setCaughtUpPersistentMark = \upd -> + (if upd then GSM.touchMarkerFile else GSM.removeMarkerFile) + gsmMarkerFileView + , GSM.writeLedgerStateJudgement = \x -> atomically $ do + writeTVar varLedgerJudgement x + } + judgment <- readTVarIO varLedgerJudgement + void $ forkLinkedThread registry "NodeKernel.GSM" $ case judgment of + TooOld -> GSM.enterOnlyBootstrap gsm + YoungEnough -> GSM.enterCaughtUp gsm void $ forkLinkedThread registry "NodeKernel.blockForging" $ blockForgingController st (LazySTM.takeTMVar blockForgingVar) - let IS { blockFetchInterface, fetchClientRegistry, varCandidates, - peerSharingRegistry, mempool } = st - -- Run the block fetch logic in the background. This will call -- 'addFetchedBlock' whenever a new block is downloaded. void $ forkLinkedThread registry "NodeKernel.blockFetchLogic" $ @@ -180,15 +249,17 @@ initNodeKernel args@NodeKernelArgs { registry, cfg, tracers blockFetchConfiguration return NodeKernel - { getChainDB = chainDB - , getMempool = mempool - , getTopLevelConfig = cfg - , getFetchClientRegistry = fetchClientRegistry - , getFetchMode = readFetchMode blockFetchInterface - , getNodeCandidates = varCandidates - , getPeerSharingRegistry = peerSharingRegistry - , getTracers = tracers - , setBlockForging = \a -> atomically . LazySTM.putTMVar blockForgingVar $! a + { getChainDB = chainDB + , getMempool = mempool + , getTopLevelConfig = cfg + , getFetchClientRegistry = fetchClientRegistry + , getFetchMode = readFetchMode blockFetchInterface + , getLedgerStateJudgement = readTVar varLedgerJudgement + , getNodeCandidates = varCandidates + , getNodeIdlers = varIdlers + , getPeerSharingRegistry = peerSharingRegistry + , getTracers = tracers + , setBlockForging = \a -> atomically . LazySTM.putTMVar blockForgingVar $! a } where blockForgingController :: InternalState m remotePeer localPeer blk @@ -216,8 +287,10 @@ data InternalState m addrNTN addrNTC blk = IS { , blockFetchInterface :: BlockFetchConsensusInterface (ConnectionId addrNTN) (Header blk) blk m , fetchClientRegistry :: FetchClientRegistry (ConnectionId addrNTN) (Header blk) blk m , varCandidates :: StrictTVar m (Map (ConnectionId addrNTN) (StrictTVar m (AnchoredFragment (Header blk)))) + , varIdlers :: StrictTVar m (Set (ConnectionId addrNTN)) , mempool :: Mempool m blk , peerSharingRegistry :: PeerSharingRegistry addrNTN m + , varLedgerJudgement :: StrictTVar m LedgerStateJudgement } initInternalState :: @@ -232,8 +305,18 @@ initInternalState :: initInternalState NodeKernelArgs { tracers, chainDB, registry, cfg , blockFetchSize, btime , mempoolCapacityOverride + , gsmArgs } = do + varLedgerJudgement <- do + let GsmNodeKernelArgs {..} = gsmArgs + j <- GSM.initializationLedgerJudgement + (atomically $ ledgerState <$> ChainDB.getCurrentLedger chainDB) + gsmDurationUntilTooOld + gsmMarkerFileView + newTVarIO j + varCandidates <- newTVarIO mempty + varIdlers <- newTVarIO mempty mempool <- openMempool registry (chainDBLedgerInterface chainDB) (configLedger cfg) diff --git a/ouroboros-consensus-diffusion/src/unstable-diffusion-testlib/Test/ThreadNet/Network.hs b/ouroboros-consensus-diffusion/src/unstable-diffusion-testlib/Test/ThreadNet/Network.hs index 89025e7e50..ff7e9990be 100644 --- a/ouroboros-consensus-diffusion/src/unstable-diffusion-testlib/Test/ThreadNet/Network.hs +++ b/ouroboros-consensus-diffusion/src/unstable-diffusion-testlib/Test/ThreadNet/Network.hs @@ -73,6 +73,7 @@ import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client as CSClient import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client.InFutureCheck as InFutureCheck import qualified Ouroboros.Consensus.Network.NodeToNode as NTN import Ouroboros.Consensus.Node.ExitPolicy +import qualified Ouroboros.Consensus.Node.GSM as GSM import Ouroboros.Consensus.Node.InitStorage import Ouroboros.Consensus.Node.NetworkProtocolVersion import Ouroboros.Consensus.Node.ProtocolInfo @@ -997,6 +998,16 @@ runThreadNetwork systemTime ThreadNetworkArgs -- blockfetch descision interval. , bfcSalt = 0 } + , gsmArgs = GSM.GsmNodeKernelArgs { + gsmAntiThunderingHerd = kaRng + , gsmDurationUntilTooOld = Nothing + , gsmMarkerFileView = GSM.MarkerFileView { + touchMarkerFile = pure () + , removeMarkerFile = pure () + , hasMarkerFile = pure False + } + , gsmMinCaughtUpDuration = 0 + } } nodeKernel <- initNodeKernel nodeKernelArgs @@ -1458,9 +1469,10 @@ newNodeInfo = do (v1, m1) <- mk (v2, m2) <- mk (v3, m3) <- mk + (v4, m4) <- mk pure - ( NodeDBs v1 v2 v3 - , NodeDBs <$> m1 <*> m2 <*> m3 + ( NodeDBs v1 v2 v3 v4 + , NodeDBs <$> m1 <*> m2 <*> m3 <*> m4 ) pure diff --git a/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/PeerSimulator/BlockFetch.hs b/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/PeerSimulator/BlockFetch.hs index dd60e81f8e..45f9c73762 100644 --- a/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/PeerSimulator/BlockFetch.hs +++ b/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/PeerSimulator/BlockFetch.hs @@ -15,6 +15,7 @@ module Test.Consensus.PeerSimulator.BlockFetch ( import Control.Monad (void) import Control.Monad.Class.MonadTime +import Control.Monad.Class.MonadTimer.SI (MonadTimer) import Control.Tracer (nullTracer) import Data.Hashable (Hashable) import Data.Map.Strict (Map) @@ -104,7 +105,7 @@ startKeepAliveThread registry fetchClientRegistry peerId = atomically retry runBlockFetchClient :: - (Ord peer, IOLike m, MonadTime m) + (Ord peer, IOLike m, MonadTime m, MonadTimer m) => peer -> FetchClientRegistry peer (Header TestBlock) TestBlock m -> ControlMessageSTM m diff --git a/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/PeerSimulator/Run.hs b/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/PeerSimulator/Run.hs index f3eb1e3655..de4f1fcfe4 100644 --- a/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/PeerSimulator/Run.hs +++ b/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/PeerSimulator/Run.hs @@ -103,6 +103,8 @@ basicChainSyncClient tracer cfg chainDbView varCandidate = , CSClient.controlMessageSTM = return Continue , CSClient.headerMetricsTracer = nullTracer , CSClient.varCandidate + , CSClient.startIdling = pure () + , CSClient.stopIdling = pure () } where dummyHeaderInFutureCheck :: @@ -180,7 +182,7 @@ startChainSyncConnectionThread registry tracer cfg activeSlotCoefficient chainDb -- | Start the BlockFetch client, using the supplied 'FetchClientRegistry' to -- register it for synchronization with the ChainSync client. startBlockFetchConnectionThread :: - (IOLike m, MonadTime m) => + (IOLike m, MonadTime m, MonadTimer m) => ResourceRegistry m -> FetchClientRegistry PeerId (Header TestBlock) TestBlock m -> ControlMessageSTM m -> diff --git a/ouroboros-consensus/bench/ChainSync-client-bench/Main.hs b/ouroboros-consensus/bench/ChainSync-client-bench/Main.hs index 10575e9e6d..0b1882ffd0 100644 --- a/ouroboros-consensus/bench/ChainSync-client-bench/Main.hs +++ b/ouroboros-consensus/bench/ChainSync-client-bench/Main.hs @@ -135,6 +135,8 @@ oneBenchRun , CSClient.controlMessageSTM = return Continue , CSClient.headerMetricsTracer = nullTracer , CSClient.varCandidate + , CSClient.startIdling = pure () + , CSClient.stopIdling = pure () } server :: ChainSyncServer H (Point B) (Tip B) IO () diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ChainSync/Client.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ChainSync/Client.hs index ef973f45c8..0daac29c5c 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ChainSync/Client.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ChainSync/Client.hs @@ -64,6 +64,8 @@ import Data.Kind (Type) import Data.Map.Strict (Map) import qualified Data.Map.Strict as Map import Data.Proxy +import Data.Set (Set) +import qualified Data.Set as Set import Data.Typeable import Data.Word (Word64) import GHC.Generics (Generic) @@ -161,14 +163,28 @@ bracketChainSyncClient :: -> StrictTVar m (Map peer (StrictTVar m (AnchoredFragment (Header blk)))) -- ^ The candidate chains, we need the whole map because we -- (de)register nodes (@peer@). + -> StrictTVar m (Set peer) + -- ^ This ChainSync client should ensure that its peer is in this set while + -- and only while both of the following conditions are satisfied: the + -- peer's latest -- message has been fully processed (especially that its + -- candidate has been updated; previous argument) and its latest message + -- did not claim that it already has headers that extend its candidate. + -- + -- It's more important that the client is removed from the set promptly + -- than it is for the client to be added promptly, because of how this is + -- used by the GSM to determine that the node is done syncing. -> peer -> NodeToNodeVersion - -> (StrictTVar m (AnchoredFragment (Header blk)) -> m a) + -> ( StrictTVar m (AnchoredFragment (Header blk)) + -> (m (), m ()) + -> m a + ) -> m a bracketChainSyncClient tracer ChainDbView { getIsInvalidBlock } varCandidates + varIdling peer version body @@ -178,15 +194,20 @@ bracketChainSyncClient withWatcher "ChainSync.Client.rejectInvalidBlocks" (invalidBlockWatcher varCandidate) - $ body varCandidate + $ body + varCandidate + ( atomically $ modifyTVar varIdling $ Set.insert peer + , atomically $ modifyTVar varIdling $ Set.delete peer + ) where newCandidateVar = do varCandidate <- newTVarIO $ AF.Empty AF.AnchorGenesis atomically $ modifyTVar varCandidates $ Map.insert peer varCandidate return varCandidate - releaseCandidateVar _ = do - atomically $ modifyTVar varCandidates $ Map.delete peer + releaseCandidateVar _ = atomically $ do + modifyTVar varCandidates $ Map.delete peer + modifyTVar varIdling $ Set.delete peer invalidBlockWatcher varCandidate = invalidBlockRejector @@ -495,6 +516,12 @@ data DynamicEnv m blk = DynamicEnv { , controlMessageSTM :: ControlMessageSTM m , headerMetricsTracer :: HeaderMetricsTracer m , varCandidate :: StrictTVar m (AnchoredFragment (Header blk)) + , startIdling :: m () + -- ^ Insert the peer into the idling set argument of + -- 'bracketChainSyncClient' + , stopIdling :: m () + -- ^ Remove the peer from the idling set argument of + -- 'bracketChainSyncClient' } -- | General values collectively needed by the top-level entry points @@ -586,6 +613,10 @@ chainSyncClient cfgEnv dynEnv = getCurrentChain } = chainDbView + DynamicEnv { + stopIdling + } = dynEnv + mkIntEnv :: InFutureCheck.HeaderInFutureCheck m blk arrival judgment -> InternalEnv m blk arrival judgment @@ -629,7 +660,7 @@ chainSyncClient cfgEnv dynEnv = recvMsgRollForward = \_hdr _tip -> go n' s , recvMsgRollBackward = \_pt _tip -> go n' s } - in Stateful $ go n0 + in Stateful $ \s -> do stopIdling; go n0 s terminate :: ChainSyncClientResult @@ -855,6 +886,8 @@ knownIntersectionStateTop cfgEnv dynEnv intEnv = DynamicEnv { controlMessageSTM , headerMetricsTracer + , startIdling + , stopIdling , varCandidate } = dynEnv @@ -930,22 +963,21 @@ knownIntersectionStateTop cfgEnv dynEnv intEnv = requestNext kis mkPipelineDecision n theirTip candTipBlockNo = let theirTipBlockNo = getTipBlockNo (unTheir theirTip) decision = - runPipelineDecision - mkPipelineDecision - n - candTipBlockNo - theirTipBlockNo + runPipelineDecision + mkPipelineDecision + n + candTipBlockNo + theirTipBlockNo in case (n, decision) of (Zero, (Request, mkPipelineDecision')) -> SendMsgRequestNext + startIdling -- on MsgAwaitReply (handleNext kis mkPipelineDecision' Zero) - ( -- when we have to wait - return $ handleNext kis mkPipelineDecision' Zero - ) (_, (Pipeline, mkPipelineDecision')) -> SendMsgRequestNextPipelined + startIdling -- on MsgAwaitReply $ requestNext kis mkPipelineDecision' @@ -955,9 +987,10 @@ knownIntersectionStateTop cfgEnv dynEnv intEnv = (Succ n', (CollectOrPipeline, mkPipelineDecision')) -> CollectResponse - ( Just + ( Just $ pure $ SendMsgRequestNextPipelined + startIdling -- on MsgAwaitReply $ requestNext kis mkPipelineDecision' @@ -979,6 +1012,7 @@ knownIntersectionStateTop cfgEnv dynEnv intEnv = -> Consensus (ClientStNext n) blk m handleNext kis mkPipelineDecision n = ClientStNext { recvMsgRollForward = \hdr theirTip -> do + stopIdling traceWith tracer $ TraceDownloadedHeader hdr continueWithState kis $ rollForward @@ -988,6 +1022,7 @@ knownIntersectionStateTop cfgEnv dynEnv intEnv = (Their theirTip) , recvMsgRollBackward = \intersection theirTip -> do + stopIdling let intersection' :: Point blk intersection' = castPoint intersection traceWith tracer $ TraceRolledBack intersection' diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Args.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Args.hs index 5894c5663f..dd54c7087a 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Args.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Args.hs @@ -41,6 +41,7 @@ data ChainDbArgs f m blk = ChainDbArgs { cdbHasFSImmutableDB :: SomeHasFS m , cdbHasFSVolatileDB :: SomeHasFS m , cdbHasFSLgrDB :: SomeHasFS m + , cdbHasFSGsmDB :: SomeHasFS m -- Policy , cdbImmutableDbValidation :: ImmutableDB.ValidationPolicy @@ -93,6 +94,7 @@ data ChainDbSpecificArgs f m blk = ChainDbSpecificArgs { -- 'cdbsGcInterval'. , cdbsRegistry :: HKD f (ResourceRegistry m) , cdbsTracer :: Tracer m (TraceEvent blk) + , cdbsHasFSGsmDB :: SomeHasFS m } -- | Default arguments @@ -117,14 +119,18 @@ data ChainDbSpecificArgs f m blk = ChainDbSpecificArgs { -- have, because of batching) < the number of blocks sync in @gcInterval@. -- E.g., when syncing at 1k-2k blocks/s, this means 10k-20k blocks. During -- normal operation, we receive 1 block/20s, meaning at most 1 block. -defaultSpecificArgs :: Monad m => ChainDbSpecificArgs Defaults m blk -defaultSpecificArgs = ChainDbSpecificArgs { +defaultSpecificArgs :: + Monad m + => (RelativeMountPoint -> SomeHasFS m) + -> ChainDbSpecificArgs Defaults m blk +defaultSpecificArgs mkFS = ChainDbSpecificArgs { cdbsBlocksToAddSize = 10 , cdbsCheckInFuture = NoDefault , cdbsGcDelay = secondsToDiffTime 60 , cdbsGcInterval = secondsToDiffTime 10 , cdbsRegistry = NoDefault , cdbsTracer = nullTracer + , cdbsHasFSGsmDB = mkFS $ RelativeMountPoint "gsm" } -- | Default arguments @@ -141,7 +147,7 @@ defaultArgs mkFS = toChainDbArgs (ImmutableDB.defaultArgs immFS) (VolatileDB.defaultArgs volFS) (LgrDB.defaultArgs lgrFS) - defaultSpecificArgs + (defaultSpecificArgs mkFS) where immFS, volFS, lgrFS :: SomeHasFS m @@ -193,6 +199,7 @@ fromChainDbArgs ChainDbArgs{..} = ( , cdbsGcInterval = cdbGcInterval , cdbsCheckInFuture = cdbCheckInFuture , cdbsBlocksToAddSize = cdbBlocksToAddSize + , cdbsHasFSGsmDB = cdbHasFSGsmDB } ) @@ -214,6 +221,7 @@ toChainDbArgs ImmutableDB.ImmutableDbArgs {..} cdbHasFSImmutableDB = immHasFS , cdbHasFSVolatileDB = volHasFS , cdbHasFSLgrDB = lgrHasFS + , cdbHasFSGsmDB = cdbsHasFSGsmDB -- Policy , cdbImmutableDbValidation = immValidationPolicy , cdbVolatileDbValidation = volValidationPolicy diff --git a/ouroboros-consensus/src/unstable-consensus-testlib/Test/Util/ChainDB.hs b/ouroboros-consensus/src/unstable-consensus-testlib/Test/Util/ChainDB.hs index a638683ad7..5a2b4635ea 100644 --- a/ouroboros-consensus/src/unstable-consensus-testlib/Test/Util/ChainDB.hs +++ b/ouroboros-consensus/src/unstable-consensus-testlib/Test/Util/ChainDB.hs @@ -45,6 +45,7 @@ data NodeDBs db = NodeDBs { nodeDBsImm :: db , nodeDBsVol :: db , nodeDBsLgr :: db + , nodeDBsGsm :: db } deriving (Functor, Foldable, Traversable) @@ -53,6 +54,7 @@ emptyNodeDBs = NodeDBs <$> uncheckedNewTVarM Mock.empty <*> uncheckedNewTVarM Mock.empty <*> uncheckedNewTVarM Mock.empty + <*> uncheckedNewTVarM Mock.empty -- | Minimal set of arguments for creating a ChainDB instance for testing purposes. data MinimalChainDbArgs m blk = MinimalChainDbArgs { @@ -82,6 +84,8 @@ fromMinimalChainDbArgs MinimalChainDbArgs {..} = ChainDbArgs { cdbHasFSImmutableDB = SomeHasFS $ simHasFS (nodeDBsImm mcdbNodeDBs') , cdbHasFSVolatileDB = SomeHasFS $ simHasFS (nodeDBsVol mcdbNodeDBs') , cdbHasFSLgrDB = SomeHasFS $ simHasFS (nodeDBsLgr mcdbNodeDBs') + , cdbHasFSGsmDB = SomeHasFS $ simHasFS (nodeDBsGsm mcdbNodeDBs') + , cdbImmutableDbValidation = ImmutableDB.ValidateAllChunks , cdbVolatileDbValidation = VolatileDB.ValidateAll , cdbMaxBlocksPerFile = VolatileDB.mkBlocksPerFile 4 diff --git a/ouroboros-consensus/test/consensus-test/Test/Consensus/MiniProtocol/BlockFetch/Client.hs b/ouroboros-consensus/test/consensus-test/Test/Consensus/MiniProtocol/BlockFetch/Client.hs index 3213a67f2f..663ba8e7f3 100644 --- a/ouroboros-consensus/test/consensus-test/Test/Consensus/MiniProtocol/BlockFetch/Client.hs +++ b/ouroboros-consensus/test/consensus-test/Test/Consensus/MiniProtocol/BlockFetch/Client.hs @@ -24,6 +24,7 @@ module Test.Consensus.MiniProtocol.BlockFetch.Client (tests) where import Control.Monad (replicateM) import Control.Monad.Class.MonadTime +import Control.Monad.Class.MonadTimer.SI (MonadTimer) import Control.Monad.IOSim (runSimOrThrow) import Control.Tracer (Tracer (..), nullTracer, traceWith) import Data.Bifunctor (first) @@ -119,7 +120,7 @@ data BlockFetchClientOutcome = BlockFetchClientOutcome { runBlockFetchTest :: forall m. - (IOLike m, MonadTime m) + (IOLike m, MonadTime m, MonadTimer m) => BlockFetchClientTestSetup -> m BlockFetchClientOutcome runBlockFetchTest BlockFetchClientTestSetup{..} = withRegistry \registry -> do diff --git a/ouroboros-consensus/test/consensus-test/Test/Consensus/MiniProtocol/ChainSync/Client.hs b/ouroboros-consensus/test/consensus-test/Test/Consensus/MiniProtocol/ChainSync/Client.hs index d46ce7112a..e5411d8d84 100644 --- a/ouroboros-consensus/test/consensus-test/Test/Consensus/MiniProtocol/ChainSync/Client.hs +++ b/ouroboros-consensus/test/consensus-test/Test/Consensus/MiniProtocol/ChainSync/Client.hs @@ -346,7 +346,8 @@ runChainSync skew securityParam (ClientUpdates clientUpdates) (ServerUpdates serverUpdates) -- Set up the client - varCandidates <- uncheckedNewTVarM Map.empty + varCandidates <- uncheckedNewTVarM mempty + varIdlers <- uncheckedNewTVarM mempty varClientState <- uncheckedNewTVarM Genesis varClientResult <- uncheckedNewTVarM Nothing varKnownInvalid <- uncheckedNewTVarM mempty @@ -401,10 +402,11 @@ runChainSync skew securityParam (ClientUpdates clientUpdates) -- client's and server's clock as the tolerable clock skew. client :: StrictTVar m (AnchoredFragment (Header TestBlock)) + -> (m (), m ()) -> Consensus ChainSyncClientPipelined TestBlock m - client varCandidate = + client varCandidate (startIdling, stopIdling) = chainSyncClient ConfigEnv { chainDbView @@ -419,6 +421,8 @@ runChainSync skew securityParam (ClientUpdates clientUpdates) , controlMessageSTM = return Continue , headerMetricsTracer = nullTracer , varCandidate + , startIdling + , stopIdling } -- Set up the server @@ -489,13 +493,14 @@ runChainSync skew securityParam (ClientUpdates clientUpdates) chainSyncTracer chainDbView varCandidates + varIdlers serverId - maxBound $ \varCandidate -> do + maxBound $ \varCandidate idlingSignals -> do atomically $ modifyTVar varFinalCandidates $ Map.insert serverId varCandidate result <- runPipelinedPeer protocolTracer codecChainSyncId clientChannel $ - chainSyncClientPeerPipelined $ client varCandidate + chainSyncClientPeerPipelined $ client varCandidate idlingSignals atomically $ writeTVar varClientResult (Just (ClientFinished result)) return () `catchAlsoLinked` \ex -> do From f09fa3cbd1ee0755d336fedb6d21c466036d5795 Mon Sep 17 00:00:00 2001 From: Nicolas Frisby Date: Wed, 14 Feb 2024 13:35:00 -0800 Subject: [PATCH 4/7] consensus-diffusion: partial vendor of QSM for IOSim in GSM test --- .../ouroboros-consensus-diffusion.cabal | 3 + .../IOSimQSM/Test/StateMachine/Sequential.hs | 176 ++++++++++++++++++ 2 files changed, 179 insertions(+) create mode 100644 ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/IOSimQSM/Test/StateMachine/Sequential.hs diff --git a/ouroboros-consensus-diffusion/ouroboros-consensus-diffusion.cabal b/ouroboros-consensus-diffusion/ouroboros-consensus-diffusion.cabal index 7277d6b2fa..36c998fb66 100644 --- a/ouroboros-consensus-diffusion/ouroboros-consensus-diffusion.cabal +++ b/ouroboros-consensus-diffusion/ouroboros-consensus-diffusion.cabal @@ -222,6 +222,7 @@ test-suite consensus-test Test.Consensus.HardFork.Combinator Test.Consensus.HardFork.Combinator.A Test.Consensus.HardFork.Combinator.B + Test.Consensus.IOSimQSM.Test.StateMachine.Sequential Test.Consensus.Network.AnchoredFragment.Extras Test.Consensus.Network.Driver.Limits.Extras Test.Consensus.Node @@ -257,7 +258,9 @@ test-suite consensus-test , ouroboros-network-framework , ouroboros-network-mock , ouroboros-network-protocols + , pretty-show , QuickCheck + , quickcheck-state-machine , quiet , serialise , si-timers diff --git a/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/IOSimQSM/Test/StateMachine/Sequential.hs b/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/IOSimQSM/Test/StateMachine/Sequential.hs new file mode 100644 index 0000000000..8b553b6145 --- /dev/null +++ b/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/IOSimQSM/Test/StateMachine/Sequential.hs @@ -0,0 +1,176 @@ +{-# LANGUAGE BangPatterns #-} +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE RecordWildCards #-} +{-# LANGUAGE ScopedTypeVariables #-} + +----------------------------------------------------------------------------- +-- | +-- +-- +-- /___This is a superficial variation of the quickcheck-state-machine___/ +-- /___source file that uses @io-classes@ instead of @MonadIO@, @UnliftIO@,___/ +-- /___etc.___/ Perhaps +-- +-- will supplant this. +-- +-- +-- +-- Module : Test.Consensus.IOSimQSM.Test.StateMachine.Sequential +-- Copyright : (C) 2017, ATS Advanced Telematic Systems GmbH +-- License : BSD-style (see the file LICENSE) +-- +-- Maintainer : Stevan Andjelkovic +-- Stability : provisional +-- Portability : non-portable (GHC extensions) +-- +-- This module contains helpers for generating, shrinking, and checking +-- sequential programs. +-- +----------------------------------------------------------------------------- + +module Test.Consensus.IOSimQSM.Test.StateMachine.Sequential (runCommands') where + +import Control.Concurrent.Class.MonadSTM.TChan (TChan, newTChanIO, + tryReadTChan, writeTChan) +import Control.Exception (SomeAsyncException (..), SomeException, + displayException, fromException) +import Control.Monad (when) +import Control.Monad.Class.MonadSay (MonadSay, say) +import Control.Monad.State.Strict (StateT, get, lift, put, runStateT) +import Data.Dynamic (Dynamic, toDyn) +import Data.Either (fromRight) +import Data.Maybe (fromMaybe) +import qualified Data.Set as S +import Ouroboros.Consensus.Util.IOLike (ExitCase (..), IOLike, + MonadCatch (..), atomically, catch, throwIO) +import Test.StateMachine.Logic +import Test.StateMachine.Types +import qualified Test.StateMachine.Types.Rank2 as Rank2 +import Test.StateMachine.Utils +import Text.Show.Pretty (ppShow) + +------------------------------------------------------------------------ + +runCommands' :: (Show (cmd Concrete), Show (resp Concrete)) + => (Rank2.Traversable cmd, Rank2.Foldable resp) + => (IOLike m, MonadSay m) + => m (StateMachine model cmd m resp) + -> Commands cmd resp + -> m (History cmd resp, model Concrete, Reason) +runCommands' msm cmds = do + hchan <- newTChanIO + (reason, (_, _, _, model)) <- + fst <$> generalBracket + msm + (\sm' ec -> case ec of + ExitCaseSuccess (_, (_,_,_,model)) -> cleanup sm' model + _ -> getChanContents hchan >>= cleanup sm' . mkModel sm' . History + ) + (\sm'@StateMachine{ initModel } -> runStateT + (executeCommands sm' hchan (Pid 0) CheckEverything cmds) + (emptyEnvironment, initModel, newCounter, initModel)) + hist <- getChanContents hchan + return (History hist, model, reason) + +-- We should try our best to not let this function fail, +-- since it is used to cleanup resources, in parallel programs. +getChanContents :: IOLike m => TChan m a -> m [a] +getChanContents chan = reverse <$> atomically (go' []) + where + go' acc = do + mx <- tryReadTChan chan + case mx of + Just x -> go' (x : acc) + Nothing -> return acc + +data Check + = CheckPrecondition + | CheckEverything + +executeCommands :: (Show (cmd Concrete), Show (resp Concrete)) + => (Rank2.Traversable cmd, Rank2.Foldable resp) + => (MonadSay m, IOLike m) + => StateMachine model cmd m resp + -> TChan m (Pid, HistoryEvent cmd resp) + -> Pid + -> Check + -> Commands cmd resp + -> StateT (Environment, model Symbolic, Counter, model Concrete) m Reason +executeCommands StateMachine {..} hchan pid check = + go . unCommands + where + go [] = return Ok + go (Command scmd _ vars : cmds) = do + (env, smodel, counter, cmodel) <- get + case (check, logic (precondition smodel scmd)) of + (CheckPrecondition, VFalse ce) -> return (PreconditionFailed (show ce)) + (CheckEverything, VFalse ce) -> return (PreconditionFailed (show ce)) + _otherwise -> do + let ccmd = fromRight (error "executeCommands: impossible") (reify env scmd) + lift $ atomically (writeTChan hchan (pid, Invocation ccmd (S.fromList vars))) + !ecresp <- lift $ fmap Right (semantics ccmd) `catch` + \(err :: SomeException) -> do + when (isSomeAsyncException err) (say (displayException err) >> throwIO err) + return (Left (displayException err)) + case ecresp of + Left err -> do + lift $ atomically (writeTChan hchan (pid, Exception err)) + return $ ExceptionThrown err + Right cresp -> do + let cvars = getUsedConcrete cresp + if length vars /= length cvars + then do + let err = mockSemanticsMismatchError (ppShow ccmd) (ppShow vars) (ppShow cresp) (ppShow cvars) + lift $ atomically (writeTChan hchan (pid, Response cresp)) + return $ MockSemanticsMismatch err + else do + lift $ atomically (writeTChan hchan (pid, Response cresp)) + case (check, logic (postcondition cmodel ccmd cresp)) of + (CheckEverything, VFalse ce) -> return (PostconditionFailed (show ce)) + _otherwise -> + case (check, logic (fromMaybe (const Top) invariant cmodel)) of + (CheckEverything, VFalse ce') -> return (InvariantBroken (show ce')) + _otherwise -> do + let (sresp, counter') = runGenSym (mock smodel scmd) counter + put ( insertConcretes vars cvars env + , transition smodel scmd sresp + , counter' + , transition cmodel ccmd cresp + ) + go cmds + + isSomeAsyncException :: SomeException -> Bool + isSomeAsyncException se = case fromException se of + Just (SomeAsyncException _) -> True + _ -> False + + mockSemanticsMismatchError :: String -> String -> String -> String -> String + mockSemanticsMismatchError cmd svars cresp cvars = unlines + [ "" + , "Mismatch between `mock` and `semantics`." + , "" + , "The definition of `mock` for the command:" + , "" + , " ", cmd + , "" + , "returns the following references:" + , "" + , " ", svars + , "" + , "while the response from `semantics`:" + , "" + , " ", cresp + , "" + , "returns the following references:" + , "" + , " ", cvars + , "" + , "Continuing to execute commands at this point could result in scope" + , "errors, because we might have commands that use references (returned" + , "by `mock`) that are not available (returned by `semantics`)." + , "" + ] + +getUsedConcrete :: Rank2.Foldable f => f Concrete -> [Dynamic] +getUsedConcrete = Rank2.foldMap (\(Concrete x) -> [toDyn x]) From 5bb318dbc7a3a6e5af759aaf33b78a56376eeb60 Mon Sep 17 00:00:00 2001 From: Nicolas Frisby Date: Wed, 14 Feb 2024 13:35:59 -0800 Subject: [PATCH 5/7] consensus-diffusion: a QSM IOSim test of the GSM --- .../ouroboros-consensus-diffusion.cabal | 6 +- .../test/consensus-test/Main.hs | 2 + .../test/consensus-test/Test/Consensus/GSM.hs | 406 +++++++++++ .../Test/Consensus/GSM/Model.hs | 660 ++++++++++++++++++ 4 files changed, 1073 insertions(+), 1 deletion(-) create mode 100644 ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/GSM.hs create mode 100644 ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/GSM/Model.hs diff --git a/ouroboros-consensus-diffusion/ouroboros-consensus-diffusion.cabal b/ouroboros-consensus-diffusion/ouroboros-consensus-diffusion.cabal index 36c998fb66..997fe356d2 100644 --- a/ouroboros-consensus-diffusion/ouroboros-consensus-diffusion.cabal +++ b/ouroboros-consensus-diffusion/ouroboros-consensus-diffusion.cabal @@ -219,6 +219,8 @@ test-suite consensus-test Test.Consensus.Genesis.Setup.GenChains Test.Consensus.Genesis.Tests Test.Consensus.Genesis.Tests.LongRangeAttack + Test.Consensus.GSM + Test.Consensus.GSM.Model Test.Consensus.HardFork.Combinator Test.Consensus.HardFork.Combinator.A Test.Consensus.HardFork.Combinator.B @@ -260,11 +262,12 @@ test-suite consensus-test , ouroboros-network-protocols , pretty-show , QuickCheck - , quickcheck-state-machine + , quickcheck-state-machine:no-vendored-treediff , quiet , serialise , si-timers , sop-extras + , strict-checked-vars , strict-sop-core , strict-stm , tasty @@ -272,6 +275,7 @@ test-suite consensus-test , tasty-quickcheck , temporary , time + , tree-diff , typed-protocols , typed-protocols-examples , unstable-diffusion-testlib diff --git a/ouroboros-consensus-diffusion/test/consensus-test/Main.hs b/ouroboros-consensus-diffusion/test/consensus-test/Main.hs index c4b2443ff6..ab52fd83b3 100644 --- a/ouroboros-consensus-diffusion/test/consensus-test/Main.hs +++ b/ouroboros-consensus-diffusion/test/consensus-test/Main.hs @@ -1,6 +1,7 @@ module Main (main) where import qualified Test.Consensus.Genesis.Tests (tests) +import qualified Test.Consensus.GSM (tests) import qualified Test.Consensus.HardFork.Combinator (tests) import qualified Test.Consensus.Node (tests) import Test.Tasty @@ -20,4 +21,5 @@ tests = ] ] , Test.Consensus.Genesis.Tests.tests + , testGroup "GSM" Test.Consensus.GSM.tests ] diff --git a/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/GSM.hs b/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/GSM.hs new file mode 100644 index 0000000000..27384a4552 --- /dev/null +++ b/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/GSM.hs @@ -0,0 +1,406 @@ +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE Rank2Types #-} + +module Test.Consensus.GSM (tests) where + +import Control.Concurrent.Class.MonadSTM.Strict.TVar.Checked +import Control.Monad (replicateM_) +import Control.Monad.Class.MonadAsync (poll, withAsync) +import Control.Monad.Class.MonadFork (MonadFork, yield) +import Control.Monad.Class.MonadSTM +import qualified Control.Monad.Class.MonadTime.SI as SI +import qualified Control.Monad.Class.MonadTimer.SI as SI +import qualified Control.Monad.IOSim as IOSim +import Control.Tracer (Tracer (Tracer)) +import Data.Functor ((<&>)) +import qualified Data.Map.Strict as Map +import qualified Data.Set as Set +import qualified Ouroboros.Consensus.Node.GSM as GSM +import Ouroboros.Consensus.Util.IOLike (IOLike) +import Ouroboros.Network.PeerSelection.LedgerPeers.Type + (LedgerStateJudgement (..)) +import Test.Consensus.GSM.Model +import Test.Consensus.IOSimQSM.Test.StateMachine.Sequential + (runCommands') +import qualified Test.QuickCheck as QC +import qualified Test.QuickCheck.Monadic as QC +import qualified Test.StateMachine as QSM +import Test.StateMachine (Concrete) +import qualified Test.StateMachine.Types as QSM +import Test.Tasty (TestTree) +import Test.Tasty.QuickCheck (testProperty) +import Test.Util.Orphans.IOLike () +import Test.Util.TestEnv (adjustQuickCheckTests) +import Test.Util.ToExpr () + +----- + +tests :: [TestTree] +tests = + adhoc <> core + where + adhoc = [ + testProperty "GSM yield regression" prop_yield_regression + ] + + core = [ + adjustQuickCheckTests (* 10) + $ testProperty ("GSM (" <> coreTestName ub <> ")") + $ prop_sequential ub + | ub <- Nothing : map Just [minBound .. maxBound :: UpstreamPeer] + ] + + coreTestName = \case + Nothing -> "no peers" + Just ub -> "at most " <> case fromEnum ub of + 0 -> "1 peer" + n -> show (n + 1) <> " peers" + +----- the QSM code under test + +-- These definitions are in the exact same order as the QSM tutorial at +-- , +-- except the model definition is in "Test.Consensus.GSM.Model". + +semantics :: + IOLike m + => Vars m + -> Command Concrete + -> m (Response Concrete) +semantics vars cmd = pre $ case cmd of + Disconnect peer -> do + atomically $ do + modifyTVar varCandidates $ Map.delete peer + modifyTVar varIdlers $ Set.delete peer + pure Unit + ExtendSelection sdel -> do + atomically $ do + Selection b s <- readTVar varSelection + writeTVar varSelection $! Selection (b + 1) (s + sdel) + pure Unit + ModifyCandidate peer bdel -> do + atomically $ do + + modifyTVar varIdlers $ Set.delete peer + + v <- (Map.! peer) <$> readTVar varCandidates + Candidate b <- readTVar v + writeTVar v $! Candidate (b + bdel) + + pure Unit + NewCandidate peer bdel -> do + atomically $ do + Selection b _s <- readTVar varSelection + v <- newTVar $! Candidate (b + bdel) + modifyTVar varCandidates $ Map.insert peer v + pure Unit + ReadJudgment -> do + fmap ReadThisJudgment $ atomically $ readTVar varJudgment + ReadMarker -> do + fmap ReadThisMarker $ atomically $ readTVar varMarker + StartIdling peer -> do + atomically $ modifyTVar varIdlers $ Set.insert peer + pure Unit + TimePasses dur -> do + SI.threadDelay (0.1 * fromIntegral dur) + pure Unit + where + Vars + varSelection + varCandidates + varIdlers + varJudgment + varMarker + varEvents + = vars + + pre m = do + push varEvents (EvBegin cmd) + x <- m + yieldSeveralTimes -- see Note [Why yield after the command] + push varEvents EvEnd + pure x + +sm :: + IOLike m + => Maybe UpstreamPeer + -> Vars m + -> LedgerStateJudgement + -> QSM.StateMachine Model Command m Response +sm ub vars j = QSM.StateMachine { + QSM.cleanup = \_model -> pure () + , + QSM.generator = generator ub + , + QSM.initModel = initModel j + , + QSM.invariant = Nothing + , + QSM.mock = \model cmd -> pure $ mock model cmd + , + QSM.postcondition = postcondition + , + QSM.precondition = precondition + , + QSM.semantics = semantics vars + , + QSM.shrinker = shrinker + , + QSM.transition = transition + } + +prop_sequential :: + Maybe UpstreamPeer + -> LedgerStateJudgement + -> QC.Property +prop_sequential ub j0 = + QSM.forAllCommands + commandArbitrarySM + mbMinimumCommandLen + (prop_sequential1 j0) + where + mbMinimumCommandLen = Just 20 -- just a guess + + -- NB the monad is irrelevant here but is ambiguous, so we merely ascribe a + -- convenient concrete one + commandArbitrarySM = sm ub (undefined :: Vars IO) j0 + +prop_sequential1 :: + LedgerStateJudgement + -> QSM.Commands Command Response + -> QC.Property +prop_sequential1 j0 cmds = runSimQC $ do + -- these variables are part of the 'GSM.GsmView' + varSelection <- newTVarIO (mSelection $ initModel j0) + varCandidates <- newTVarIO Map.empty + varIdlers <- newTVarIO Set.empty + varJudgment <- newTVarIO j0 + varMarker <- newTVarIO (toMarker j0) + + -- this variable is for better 'QC.counterexample' messages + varEvents <- newRecorder + let tracer = Tracer $ push varEvents . EvGsmWrite . \case + GSM.GsmEventEnterCaughtUp{} -> YoungEnough + GSM.GsmEventLeaveCaughtUp{} -> TooOld + + let vars = + Vars + varSelection + varCandidates + varIdlers + varJudgment + varMarker + varEvents + + let executionSM = sm (Just maxBound) vars j0 + + -- NB the specific IO type is unused here + prettySM = sm undefined undefined j0 + + let gsm = GSM.realGsmEntryPoints (id, tracer) GSM.GsmView { + GSM.antiThunderingHerd = Nothing + , + GSM.candidateOverSelection = candidateOverSelection + , + GSM.durationUntilTooOld = Just durationUntilTooOld + , + GSM.equivalent = (==) -- unsound, but harmless in this test + , + GSM.getChainSyncCandidates = readTVar varCandidates + , + GSM.getChainSyncIdlers = readTVar varIdlers + , + GSM.getCurrentSelection = readTVar varSelection + , + GSM.minCaughtUpDuration = thrashLimit + , + GSM.setCaughtUpPersistentMark = \b -> + atomically $ do + writeTVar varMarker $ if b then Present else Absent + , + GSM.writeLedgerStateJudgement = \x -> atomically $ do + writeTVar varJudgment x + } + gsmEntryPoint = case j0 of + TooOld -> GSM.enterOnlyBootstrap gsm + YoungEnough -> GSM.enterCaughtUp gsm + + ((hist, model', res), mbExn) <- id + $ withAsync gsmEntryPoint + $ \hGSM -> do + + yieldSeveralTimes -- see Note [Why yield after the command] + + x <- runCommands' (pure executionSM) cmds + + -- notice if the GSM thread raised an exception while processing the + -- commands + poll hGSM <&> \case + Just Right{} -> + error "impossible! GSM terminated" + Just (Left exn) -> + -- we don't simply rethrow it, since we still want to pretty print + -- the command sequence + (x, Just exn) + Nothing -> + (x, Nothing) + + let noExn = case mbExn of + Nothing -> QC.property () + Just exn -> QC.counterexample (show exn) False + + -- effectively add a 'ReadJudgment' to the end of the command list + lastCheck <- do + actual <- semantics vars ReadJudgment + let expected = mock model' ReadJudgment + pure $ case (actual, expected) of + (ReadThisJudgment x, ReadThisJudgment y) -> + QC.counterexample "lastCheck" $ x QC.=== y + _ -> + error "impossible! lastCheck response" + + watcherEvents <- dumpEvents varEvents + + pure + $ QC.monadicIO + $ QSM.prettyCommands prettySM hist + $ QC.counterexample + (unlines + $ (:) "WATCHER" + $ map ((" " <>) . show) + $ watcherEvents + ) + $ QC.tabulate + "Notables" + (case Set.toList $ mNotables model' of + [] -> [""] + notables -> map show notables + ) + $ QSM.checkCommandNames cmds + $ noExn QC..&&. lastCheck QC..&&. res QC.=== QSM.Ok + +----- + +durationUntilTooOld :: Selection -> IOSim.IOSim s GSM.DurationFromNow +durationUntilTooOld sel = do + let expiryAge = ageLimit `SI.addTime` onset sel + now <- SI.getMonotonicTime + pure $ case compare expiryAge now of + LT -> GSM.Already + GT -> GSM.After $ realToFrac $ expiryAge `SI.diffTime` now + + -- 'boringDur' cannot prevent this case. In particular, this case + -- necessarily arises in the GSM itself during a 'TimePasses' that + -- incurs a so-called /flicker/ event, in which the anti-thrashing + -- timer expires and yet the node state at that moment still + -- _immediately_ indicates that it's CaughtUp. For the specific case of + -- this test suite, the answer here must be 'GSM.Already'. + EQ -> GSM.Already + +----- + +-- | Ensure the GSM thread's transactions quiesce +-- +-- I'm unsure how many 'yield's are actually necessary, but ten is both small +-- and also seems likely to suffice. +-- +-- Despite the crudeness, this seems much more compositional than invasive +-- explicit synchronization. +yieldSeveralTimes :: MonadFork m => m () +yieldSeveralTimes = replicateM_ 10 yield + +{- + +Note [Why yield after the command] + +For this 'prop_sequential1' repro + +@ + YoungEnough + + Command (NewCandidate Amara (B 1)) Unit [] + Command (StartIdling Amara) Unit [] + Command (TimePasses 61) Unit [] + Command (ExtendSelection (S (-4))) Unit [] + Command ReadMarker (ReadThisMarker Absent) [] + + (Command ReadJudgment _ []) -- this last command is baked into the property +@ + +If we yield after the command, then both GSM flicker writes happen during the +'ExtendSelection'. + +If we yield before the command, then both GSM flicker writes happen during the +'ReadMarker'. + +If we don't yield, one write happens during the ReadMarker and the other +happens /between/ 'ReadMarker' and 'ReadJudgment'. + +It seems most intuitive for the updates to happen "as part of" the +'ExtendSelection', so I'm favoring yielding after. + +And since we're yielding after the command, we should also yield before the +first command, for consistency. + +-} + +-- | Test the example from the Note [Why yield after the command] +-- +-- This property fails when 'yieldSeveralTimes' is removed/redefined to @pure +-- ()@. +prop_yield_regression :: QC.Property +prop_yield_regression = + QC.once + $ prop_sequential1 YoungEnough + $ QSM.Commands + [ QSM.Command (NewCandidate Amara (B 1)) Unit [] + , QSM.Command (StartIdling Amara) Unit [] + , QSM.Command (TimePasses 61) Unit [] + , QSM.Command (ExtendSelection (S (-4))) Unit [] + , QSM.Command ReadMarker (ReadThisMarker Absent) [] + ] + +----- trivial event accumulator, useful for debugging test failures + +data Ev = + EvBegin (Command Concrete) + -- ^ 'semantics' started stimulating the GSM code being tested + | + EvEnd + -- ^ 'semantics' stopped stimulating the GSM code being tested + | + EvGsmWrite LedgerStateJudgement + -- ^ the GSM code being tested wrote to the TVar + deriving (Show) + +newtype EvRecorder m = EvRecorder (StrictTVar m [(SI.Time, Ev)]) + +newRecorder :: IOLike m => m (EvRecorder m) +newRecorder = EvRecorder <$> newTVarIO [] + +dumpEvents :: IOLike m => EvRecorder m -> m [(SI.Time, Ev)] +dumpEvents (EvRecorder var) = reverse <$> readTVarIO var + +push :: IOLike m => EvRecorder m -> Ev -> m () +push (EvRecorder var) ev = do + now <- SI.getMonotonicTime + atomically $ modifyTVar var $ (:) (now, ev) + +----- + +-- | merely a tidy bundle of arguments +data Vars m = Vars + (StrictTVar m Selection) + (StrictTVar m (Map.Map UpstreamPeer (StrictTVar m Candidate))) + (StrictTVar m (Set.Set UpstreamPeer)) + (StrictTVar m LedgerStateJudgement) + (StrictTVar m MarkerState) + (EvRecorder m) + +----- + +-- | a straight-forwardtrivial alias +runSimQC :: (forall s. IOSim.IOSim s QC.Property) -> QC.Property +runSimQC m = case IOSim.runSim m of + Left failure -> QC.counterexample (show failure) False + Right prop -> prop diff --git a/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/GSM/Model.hs b/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/GSM/Model.hs new file mode 100644 index 0000000000..686f1c5a6b --- /dev/null +++ b/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/GSM/Model.hs @@ -0,0 +1,660 @@ +{-# LANGUAGE DeriveAnyClass #-} +{-# LANGUAGE DeriveGeneric #-} +{-# LANGUAGE DerivingStrategies #-} +{-# LANGUAGE GeneralizedNewtypeDeriving #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE PolyKinds #-} +{-# LANGUAGE StandaloneDeriving #-} +{-# LANGUAGE StandaloneKindSignatures #-} + +{-# OPTIONS_GHC -Wno-orphans #-} + +-- | The definition of the GSM QSM model and its auxiliaries + +module Test.Consensus.GSM.Model (module Test.Consensus.GSM.Model) where + +import qualified Control.Monad.Class.MonadTime.SI as SI +import Data.Kind (Type) +import Data.List ((\\)) +import qualified Data.Map.Strict as Map +import qualified Data.Set as Set +import Data.Time (diffTimeToPicoseconds) +import qualified Data.TreeDiff as TD +import GHC.Generics (Generic, Generic1) +import qualified Ouroboros.Consensus.Node.GSM as GSM +import Ouroboros.Network.PeerSelection.LedgerPeers.Type + (LedgerStateJudgement (..)) +import qualified Test.QuickCheck as QC +import Test.QuickCheck (choose, elements, shrink) +import qualified Test.StateMachine as QSM +import Test.StateMachine (Concrete, Symbolic) +import qualified Test.StateMachine.Types.Rank2 as QSM + +----- the QSM model + +-- These definitions are in the exact same order as the QSM tutorial at +-- , +-- except @semantics@ and the property itself are defined in +-- "Test.Consensus.GSM". + +-- | TODO restarts (or maybe just killing the GSM thread) +type Command :: (Type -> Type) -> Type +data Command r = + Disconnect UpstreamPeer + -- ^ INVARIANT must be an existing peer + -- + -- Mocks the necessary ChainSync client behavior. + | + ExtendSelection S + -- ^ INVARIANT 'selectionIsBehind' + -- + -- NOTE Harmless to assume it only advances by @'B' 1@ at a time. + | + ModifyCandidate UpstreamPeer B + -- ^ INVARIANT existing peer + -- + -- Mocks the necessary ChainSync client behavior. + | + NewCandidate UpstreamPeer B + -- ^ INVARIANT new peer + -- + -- Mocks the necessary ChainSync client behavior. + | + ReadJudgment + | + ReadMarker + | + StartIdling UpstreamPeer + -- ^ INVARIANT existing peer, not idling + | + TimePasses Int + -- ^ tenths of a second + -- + -- INVARIANT positive + -- + -- INVARIANT does not end /exactly/ on an interesting time point; see + -- 'boringDur'. + -- + -- NOTE The generator does not yield consecutive 'TimePasses' commands, + -- though shrinking might. + -- + + -- DRAFT NOTE An earlier design attempted to prevent TimePasses from ever landing + -- exactly on an interesting moment, ie when some timeout expires. Notably: + -- when a slot becomes too old or when the anti-thrashing limit expires. + -- This is feasible and its complexity is tolerable for the generators. + -- However, shrinking------wait, doesn't precondition guard against it? + deriving stock (Generic1, Read, Show) + deriving anyclass (QSM.CommandNames, QSM.Foldable, QSM.Functor, QSM.Traversable) + +type Response :: (Type -> Type) -> Type +data Response r = + ReadThisJudgment LedgerStateJudgement + | + ReadThisMarker MarkerState + | + Unit + deriving stock (Generic1, Read, Show) + deriving anyclass (QSM.Foldable, QSM.Functor, QSM.Traversable) + +type Model :: (Type -> Type) -> Type +data Model r = Model { + mCandidates :: Map.Map UpstreamPeer Candidate + , + mClock :: SI.Time + , + mIdlers :: Set.Set UpstreamPeer + , + mNotables :: Set.Set Notable + , + mPrev :: WhetherPrevTimePasses + , + mSelection :: Selection + , + mState :: ModelState + } + deriving (Generic, Show) + deriving anyclass (TD.ToExpr) + +addNotableWhen :: Notable -> Bool -> Model r -> Model r +addNotableWhen n b model = + if not b then model else + model { mNotables = n `Set.insert` mNotables model } + +initModel :: LedgerStateJudgement -> Model r +initModel j = Model { + mCandidates = Map.empty + , + mClock = SI.Time 0 + , + mIdlers = Set.empty + , + mNotables = Set.empty + , + mPrev = WhetherPrevTimePasses True + , + mSelection = Selection 0 s + , + mState = case j of + TooOld -> ModelTooOld + YoungEnough -> ModelYoungEnough (SI.Time (-10000)) + } + where + s = S $ case j of + TooOld -> (-11) + YoungEnough -> 0 + +-- The extra expressivity of 'QSM.Logic' beyond 'Bool' will not be useful in +-- this test module as-is, since we only run commands (via 'runCommands'') that +-- the library generated, which ensures the preconditions. On the other hand, +-- if you're debugging a failure by manually altering commands, then these +-- annotations may be helpful. +precondition :: Model Symbolic -> Command Symbolic -> QSM.Logic +precondition model = pre $ \case + cmd@ExtendSelection{} -> + let model' = transition model cmd Unit + in + "syncing node got ahead" `atom` selectionIsBehind model + QSM..&& + "early selection" `atom` selectionIsNotEarly model' + QSM..&& + boringDur model' 0 + Disconnect peer -> + "double disconnect" `atom` (peer `Map.member` cands) + ModifyCandidate peer _bdel -> + "modify after disconnect" `atom` (peer `Map.member` cands) + NewCandidate peer _bdel -> + "double connect" `atom` (peer `Map.notMember` cands) + ReadJudgment -> + QSM.Top + ReadMarker -> + QSM.Top + StartIdling peer -> + "idle after disconnect" `atom` (peer `Map.member` cands) + QSM..&& + "double idle" `atom` (peer `Set.notMember` idlers) + TimePasses dur -> + "non-positive duration" `atom` (0 < dur) + QSM..&& + boringDur model dur + where + Model { + mCandidates = cands + , + mIdlers = idlers + } = model + + pre f cmd = f cmd QSM..// show cmd + +transition :: Model r -> Command r -> Response r -> Model r +transition model cmd resp = fixupModelState cmd $ case (cmd, resp) of + (Disconnect peer, Unit) -> + model' { + mCandidates = Map.delete peer cands + , + mIdlers = Set.delete peer idlers + } + (ExtendSelection sdel, Unit) -> + model' { mSelection = Selection (b + 1) (s + sdel) } + (ModifyCandidate peer bdel, Unit) -> + model' { + mCandidates = Map.insertWith plusC peer (Candidate bdel) cands + , + mIdlers = Set.delete peer idlers + } + (NewCandidate peer bdel, Unit) -> + model' { mCandidates = Map.insert peer (Candidate (b + bdel)) cands } + (ReadJudgment, ReadThisJudgment{}) -> + model' + (ReadMarker, ReadThisMarker{}) -> + model' + (StartIdling peer, Unit) -> + model' { mIdlers = Set.insert peer idlers } + (TimePasses dur, Unit) -> + addNotableWhen BigDurN (dur > 300) + $ model { + mClock = SI.addTime (0.1 * fromIntegral dur) clk + , + mPrev = WhetherPrevTimePasses True + } + o -> error $ "impossible response: " <> show o + where + Model { + mCandidates = cands + , + mClock = clk + , + mIdlers = idlers + , + mSelection = Selection b s + } = model + + model' = model { mPrev = WhetherPrevTimePasses False } + + plusC (Candidate x) (Candidate y) = Candidate (x + y) + +-- | Update the 'mState', assuming that's the only stale field in the given +-- 'Model' +fixupModelState :: Command r -> Model r -> Model r +fixupModelState cmd model = + case st of + ModelTooOld + | caughtUp -> + -- ASSUMPTION This new state was /NOT/ incurred by the 'TimePasses' + -- command. + -- + -- Therefore the current clock is necessarily the correct timestamp + -- to record. + addNotableWhen CaughtUpN True + $ model { mState = ModelYoungEnough clk } + | otherwise -> + model + ModelYoungEnough timestamp + | flicker timestamp -> + addNotableWhen FlickerN True + $ model { mState = ModelYoungEnough (flickerTimestamp timestamp) } + | fellBehind timestamp -> + addNotableWhen FellBehindN True + $ model { mState = ModelTooOld } + | otherwise -> + -- NB in this branch, these notables are mutually exclusive + addNotableWhen TooOldN (expiryAge < clk) + $ addNotableWhen + NotThrashingN + (SI.Time 0 < timestamp && expiryThrashing timestamp < clk) + $ model + where + Model { + mCandidates = cands + , + mClock = clk + , + mIdlers = idlers + , + mSelection = sel + , + mState = st + } = model + + caughtUp = some && allIdling && all ok cands + fellBehind timestamp = expiry timestamp < clk -- NB 'boringDur' prevents == + + flicker timestamp = fellBehind timestamp && caughtUp + + some = 0 < Map.size cands + + allIdling = idlers == Map.keysSet cands + + ok cand = + GSM.WhetherCandidateIsBetter False == candidateOverSelection sel cand + + -- the first time the node would transition to OnlyBootstrap + expiry timestamp = expiryAge `max` expiryThrashing timestamp + expiryAge = SI.addTime ageLimit (onset sel) + expiryThrashing timestamp = SI.addTime thrashLimit timestamp + + -- It's possible for the node to instantly return to CaughtUp, but that + -- might have happened /during/ the 'TimePasses' command, not only when it + -- ends. + -- + -- Therefore the age limit of the selection is the correct timestamp to + -- record, instead of the current clock (ie when the 'TimePasses' ended). + -- + -- NOTE Superficially, in the real implementation, the Diffusion Layer + -- should be discarding all peers when transitioning from CaughtUp to + -- OnlyBootstrap. However, it would be plausible for an implementation to + -- retain any bootstrap peers it happened to have, so the idiosyncratic + -- behavior of the system under test in this module is not totally + -- irrelevant. + -- + -- the /last/ time the node instantaneously visited OnlyBootstrap during + -- the 'TimePasses' command, assuming it did so at least once + flickerTimestamp timestamp = case cmd of + ExtendSelection sdel | sdel < 0 -> + clk + TimePasses{} -> + foldl max (expiry timestamp) + $ takeWhile (< clk) -- NB 'boringDur' prevents == + $ iterate (SI.addTime thrashLimit) (expiry timestamp) + _ -> + error + $ "impossible! flicker but neither " + <> + "negative ExtendSelection nor TimePasses: " + <> + show cmd + +postcondition :: + Model Concrete + -> Command Concrete + -> Response Concrete + -> QSM.Logic +postcondition model _cmd = \case + ReadThisJudgment j' -> + j' QSM..== j + ReadThisMarker m' -> + m' QSM..== toMarker j + Unit -> + QSM.Top + where + j = toJudgment $ mState model + +generator :: + Maybe UpstreamPeer + -> Model Symbolic + -> Maybe (QC.Gen (Command Symbolic)) +generator ub model = Just $ QC.frequency $ + [ (,) 5 $ Disconnect <$> elements old | notNull old ] + <> + [ (,) 10 $ ExtendSelection <$> elements sdels + | notNull sdels + , selectionIsBehind model -- NB harmless to assume this node never mints + ] + <> + [ (,) 20 $ do + (peer, bdel) <- elements bdels + ModifyCandidate peer <$> elements bdel + | notNull bdels + ] + <> + [ (,) 100 $ + NewCandidate + <$> elements new + <*> (B <$> choose (-10, 10)) + | notNull new + ] + <> + [ (,) 20 $ pure ReadJudgment ] + <> + [ (,) 20 $ pure ReadMarker ] + <> + [ (,) 50 $ StartIdling <$> elements oldNotIdling | notNull oldNotIdling ] + <> + [ (,) 100 $ TimePasses <$> genTimePassesDur | prev == WhetherPrevTimePasses False ] + where + Model { + mCandidates = cands + , + mClock = clk + , + mIdlers = idlers + , + mPrev = prev + , + mSelection = sel + } = model + + notNull :: [a] -> Bool + notNull = not . null + + old = Map.keys cands + + new = case ub of + Nothing -> [] + Just peer -> [ minBound .. peer ] \\ old + + oldNotIdling = old \\ Set.toList idlers + + genTimePassesDur = QC.frequency $ + [ (,) 10 $ choose (1, 70) ] + <> + [ (,) 1 $ choose (300, 600) + | case mState model of ModelYoungEnough{} -> True; ModelTooOld -> False + ] + + -- sdels that would not cause the selection to be in the future + sdels = + let Selection b s = sel + in + [ sdel + | sdel <- map S [-4 .. 10] + , 0 /= sdel + , onset (Selection b (s + sdel)) <= clk + ] + + -- bdels that keep the candidates' lengths near the selection + bdels = + let Selection b _s = sel + lim = 3 + in + [ (,) + peer + (filter (/= 0) [ b + offset - c | offset <- [-lim .. lim] ]) + | (peer, Candidate c) <- Map.assocs cands + ] + + +shrinker :: Model Symbolic -> Command Symbolic -> [Command Symbolic] +shrinker _model = \case + Disconnect{} -> + [] + ExtendSelection sdel -> + [ ExtendSelection sdel' | sdel' <- shrinkS sdel ] + ModifyCandidate peer bdel -> + [ ModifyCandidate peer bdel' | bdel' <- shrinkB bdel, bdel' /= 0 ] + NewCandidate peer bdel -> + [ NewCandidate peer bdel' | bdel' <- shrinkB bdel, bdel' /= 0 ] + ReadJudgment -> + [] + ReadMarker -> + [] + StartIdling{} -> + [] + TimePasses dur -> + [ TimePasses dur' | dur' <- shrink dur, 0 < dur' ] + where + shrinkB (B x) = [ B x' | x' <- shrink x ] + shrinkS (S x) = [ S x' | x' <- shrink x ] + +mock :: Model r -> Command r -> Response r +mock model = \case + Disconnect{} -> + Unit + ExtendSelection{} -> + Unit + ModifyCandidate{} -> + Unit + NewCandidate{} -> + Unit + ReadJudgment -> + ReadThisJudgment j + ReadMarker -> + ReadThisMarker $ toMarker j + StartIdling{} -> + Unit + TimePasses{} -> + Unit + where + j = toJudgment $ mState model + +----- + +-- | A block count +newtype B = B Int + deriving stock (Eq, Ord, Generic, Read, Show) + deriving newtype (Enum, Num) + deriving anyclass (TD.ToExpr) + +-- | A slot count +newtype S = S Int + deriving stock (Eq, Ord, Generic, Read, Show) + deriving newtype (Enum, Num) + deriving anyclass (TD.ToExpr) + +data UpstreamPeer = Amara | Bao | Cait | Dhani | Eric + deriving stock (Bounded, Enum, Eq, Ord, Generic, Read, Show) + deriving anyclass (TD.ToExpr) + +-- | The cumulative growth relative to whatever length the initial selection +-- was and the slot relative to the start of the test (which is assumed to be +-- the exact onset of some slot) +data Selection = Selection !B !S + deriving stock (Eq, Ord, Generic, Show) + deriving anyclass (TD.ToExpr) + +-- | The age of the candidate is irrelevant, only its length matters +newtype Candidate = Candidate B + deriving stock (Eq, Ord, Generic, Show) + deriving anyclass (TD.ToExpr) + +data MarkerState = Present | Absent + deriving stock (Eq, Ord, Generic, Read, Show) + deriving anyclass (TD.ToExpr) + +newtype WhetherPrevTimePasses = WhetherPrevTimePasses Bool + deriving stock (Eq, Ord, Generic, Show) + deriving anyclass (TD.ToExpr) + +data ModelState = + ModelTooOld + | + ModelYoungEnough !SI.Time + -- ^ when the model most recently transitioned to 'YoungEnough' + deriving stock (Eq, Ord, Generic, Show) + deriving anyclass (TD.ToExpr) + +----- + +-- | Interesting events to record /within the model/ +-- +-- TODO some less superficial ones (eg even just combinations of these) +data Notable = + BigDurN + -- ^ there was a "big" 'TimesPasses' command + | + CaughtUpN + -- ^ the node transitioned from OnlyBootstrap to CaughtUp + | + FellBehindN + -- ^ the node transitioned from CaughtUp to OnlyBootstrap + | + FlickerN + -- ^ the node transitioned from CaughtUp to OnlyBootstrap and back to + -- CaughtUp "instantly" + | + NotThrashingN + -- ^ the anti-thrashing would have allowed 'FellBehindN', but the selection + -- wasn't old enough + | + TooOldN + -- ^ the selection was old enough for 'FellBehindN', but the anti-thrashing + -- prevented it + deriving (Eq, Ord, Show) + +instance TD.ToExpr Notable where toExpr = TD.defaultExprViaShow + +----- orphans + +instance TD.ToExpr SI.Time where toExpr = TD.defaultExprViaShow +instance TD.ToExpr LedgerStateJudgement where toExpr = TD.defaultExprViaShow + +deriving instance Read LedgerStateJudgement + +instance QC.Arbitrary LedgerStateJudgement where + arbitrary = elements [TooOld, YoungEnough] + shrink = \case + TooOld -> [YoungEnough] + YoungEnough -> [] + +instance QC.Arbitrary MarkerState where + arbitrary = elements [Absent, Present] + shrink = \case + Absent -> [Present] + Present -> [] + +----- + +candidateOverSelection :: + Selection + -> Candidate + -> GSM.CandidateVersusSelection +candidateOverSelection (Selection b _s) (Candidate b') = + -- TODO this ignores CandidateDoesNotIntersect, which seems harmless, but + -- I'm not quite sure + GSM.WhetherCandidateIsBetter (b < b') + +----- + +toJudgment :: ModelState -> LedgerStateJudgement +toJudgment = \case + ModelTooOld -> TooOld + ModelYoungEnough{} -> YoungEnough + +toMarker :: LedgerStateJudgement -> MarkerState +toMarker = \case + TooOld -> Absent + YoungEnough -> Present + +----- + +atom :: String -> Bool -> QSM.Logic +atom s b = QSM.Boolean b QSM..// s + +onset :: Selection -> SI.Time +onset (Selection _b (S s)) = SI.Time $ fromIntegral s + +ageLimit :: Num a => a +ageLimit = 10 -- seconds + +thrashLimit :: Num a => a +thrashLimit = 8 -- seconds + +selectionIsBehind :: Model r -> Bool +selectionIsBehind model = + any (\(Candidate b') -> b' > b) cands + where + Model { + mCandidates = cands + , + mSelection = Selection b _s + } = model + +selectionIsNotEarly :: Model r -> Bool +selectionIsNotEarly model = + onset sel <= clk + where + Model { + mClock = clk + , + mSelection = sel + } = model + +-- | Checks that a 'TimePasses' command does not end exactly when a timeout +-- could fire and that a 'ExtendSelection' does not incur a timeout that would +-- fire immediately +-- +-- This insulates the test from race conditions that are innocuous in the real +-- world. +boringDur :: Model r -> Int -> QSM.Logic +boringDur model dur = + boringSelection QSM..&& boringState + where + Model { + mClock = clk + , + mSelection = sel + , + mState = st + } = model + + -- the first time the node would transition to OnlyBootstrap + expiry timestamp = expiryAge `max` expiryThrashing timestamp + expiryAge = SI.addTime ageLimit (onset sel) + expiryThrashing timestamp = SI.addTime thrashLimit timestamp + + clk' = SI.addTime (0.1 * fromIntegral dur) clk + + boringSelection = "boringDur selection" `atom` (clk' /= expiryAge) + + boringState = case st of + ModelTooOld -> QSM.Top + ModelYoungEnough timestamp -> + let gap = clk' `SI.diffTime` expiry timestamp + n = + mod + (diffTimeToPicoseconds gap) + (secondsToPicoseconds thrashLimit) + in + "boringDur state" `atom` (gap < 0 || 0 /= n) + + secondsToPicoseconds x = x * 10 ^ (12 :: Int) From 7509fdbd1f2a9fc48c56e751f9c891f47032f9bf Mon Sep 17 00:00:00 2001 From: Armando Santos Date: Mon, 19 Feb 2024 11:55:49 +0000 Subject: [PATCH 6/7] consensus-diffusion: use LSJ for BulkSync when possible --- .../Ouroboros/Consensus/Node.hs | 7 ++++++ .../Ouroboros/Consensus/NodeKernel.hs | 6 ++++- .../Test/ThreadNet/Network.hs | 3 +++ .../BlockFetch/ClientInterface.hs | 24 +++++++++++++++---- 4 files changed, 34 insertions(+), 6 deletions(-) diff --git a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node.hs b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node.hs index c7f1c7a29c..994116be45 100644 --- a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node.hs +++ b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node.hs @@ -120,6 +120,7 @@ import Ouroboros.Network.NodeToNode (DiffusionMode (..), ExceptionInHandler (..), MiniProtocolParameters, NodeToNodeVersionData (..), RemoteAddress, Socket, blockFetchPipeliningMax, defaultMiniProtocolParameters) +import Ouroboros.Network.PeerSelection.Bootstrap (UseBootstrapPeers) import Ouroboros.Network.PeerSelection.LedgerPeers (LedgerPeersConsensusInterface (..)) import Ouroboros.Network.PeerSelection.PeerMetric (PeerMetrics, @@ -189,6 +190,8 @@ data RunNodeArgs m addrNTN addrNTC blk (p2p :: Diffusion.P2P) = RunNodeArgs { -- | Network PeerSharing miniprotocol willingness flag , rnPeerSharing :: PeerSharing + + , rnGetUseBootstrapPeers :: STM m UseBootstrapPeers } -- | Arguments that usually only tests /directly/ specify. @@ -426,6 +429,7 @@ runWith RunNodeArgs{..} encAddrNtN decAddrNtN LowLevelRunNodeArgs{..} = llrnMaxCaughtUpAge (Just durationUntilTooOld) gsmMarkerFileView + rnGetUseBootstrapPeers nodeKernel <- initNodeKernel nodeKernelArgs rnNodeKernelHook registry nodeKernel @@ -693,6 +697,7 @@ mkNodeKernelArgs :: -> NominalDiffTime -> Maybe (GSM.WrapDurationUntilTooOld m blk) -> GSM.MarkerFileView m + -> STM m UseBootstrapPeers -> m (NodeKernelArgs m addrNTN (ConnectionId addrNTC) blk) mkNodeKernelArgs registry @@ -707,6 +712,7 @@ mkNodeKernelArgs maxCaughtUpAge gsmDurationUntilTooOld gsmMarkerFileView + getUseBootstrapPeers = do return NodeKernelArgs { tracers @@ -727,6 +733,7 @@ mkNodeKernelArgs , gsmMarkerFileView , gsmMinCaughtUpDuration = maxCaughtUpAge } + , getUseBootstrapPeers } where defaultBlockFetchConfiguration :: BlockFetchConfiguration diff --git a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/NodeKernel.hs b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/NodeKernel.hs index db2502fcd8..55aab397d3 100644 --- a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/NodeKernel.hs +++ b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/NodeKernel.hs @@ -87,6 +87,7 @@ import Ouroboros.Network.Block (castTip, tipFromHeader) import Ouroboros.Network.BlockFetch import Ouroboros.Network.NodeToNode (ConnectionId, MiniProtocolParameters (..)) +import Ouroboros.Network.PeerSelection.Bootstrap (UseBootstrapPeers) import Ouroboros.Network.PeerSelection.LedgerPeers.Type (LedgerStateJudgement (..)) import Ouroboros.Network.PeerSharing (PeerSharingRegistry, @@ -161,6 +162,7 @@ data NodeKernelArgs m addrNTN addrNTC blk = NodeKernelArgs { , blockFetchConfiguration :: BlockFetchConfiguration , keepAliveRng :: StdGen , gsmArgs :: GsmNodeKernelArgs m blk + , getUseBootstrapPeers :: STM m UseBootstrapPeers } initNodeKernel :: @@ -305,7 +307,7 @@ initInternalState :: initInternalState NodeKernelArgs { tracers, chainDB, registry, cfg , blockFetchSize, btime , mempoolCapacityOverride - , gsmArgs + , gsmArgs, getUseBootstrapPeers } = do varLedgerJudgement <- do let GsmNodeKernelArgs {..} = gsmArgs @@ -333,6 +335,8 @@ initInternalState NodeKernelArgs { tracers, chainDB, registry, cfg let readFetchMode = BlockFetchClientInterface.readFetchModeDefault btime (ChainDB.getCurrentChain chainDB) + getUseBootstrapPeers + (readTVar varLedgerJudgement) blockFetchInterface :: BlockFetchConsensusInterface (ConnectionId addrNTN) (Header blk) blk m blockFetchInterface = BlockFetchClientInterface.mkBlockFetchConsensusInterface (configBlock cfg) diff --git a/ouroboros-consensus-diffusion/src/unstable-diffusion-testlib/Test/ThreadNet/Network.hs b/ouroboros-consensus-diffusion/src/unstable-diffusion-testlib/Test/ThreadNet/Network.hs index ff7e9990be..8eda832055 100644 --- a/ouroboros-consensus-diffusion/src/unstable-diffusion-testlib/Test/ThreadNet/Network.hs +++ b/ouroboros-consensus-diffusion/src/unstable-diffusion-testlib/Test/ThreadNet/Network.hs @@ -104,6 +104,8 @@ import Ouroboros.Network.Mock.Chain (Chain (Genesis)) import Ouroboros.Network.NodeToNode (ConnectionId (..), ExpandedInitiatorContext (..), IsBigLedgerPeer (..), MiniProtocolParameters (..), ResponderContext (..)) +import Ouroboros.Network.PeerSelection.Bootstrap + (UseBootstrapPeers (..)) import Ouroboros.Network.PeerSelection.PeerMetric (nullMetric) import Ouroboros.Network.Point (WithOrigin (..)) import qualified Ouroboros.Network.Protocol.ChainSync.Type as CS @@ -1008,6 +1010,7 @@ runThreadNetwork systemTime ThreadNetworkArgs } , gsmMinCaughtUpDuration = 0 } + , getUseBootstrapPeers = pure DontUseBootstrapPeers } nodeKernel <- initNodeKernel nodeKernelArgs diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/BlockFetch/ClientInterface.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/BlockFetch/ClientInterface.hs index fcecf85c34..c062fb3527 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/BlockFetch/ClientInterface.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/BlockFetch/ClientInterface.hs @@ -43,6 +43,10 @@ import Ouroboros.Network.Block (MaxSlotNo) import Ouroboros.Network.BlockFetch.ConsensusInterface (BlockFetchConsensusInterface (..), FetchMode (..), FromConsensus (..), WhetherReceivingTentativeBlocks (..)) +import Ouroboros.Network.PeerSelection.Bootstrap (UseBootstrapPeers, + requiresBootstrapPeers) +import Ouroboros.Network.PeerSelection.LedgerPeers.Type + (LedgerStateJudgement) import Ouroboros.Network.SizeInBytes -- | Abstract over the ChainDB @@ -134,13 +138,23 @@ readFetchModeDefault :: (MonadSTM m, HasHeader blk) => BlockchainTime m -> STM m (AnchoredFragment blk) + -> STM m UseBootstrapPeers + -> STM m LedgerStateJudgement -> STM m FetchMode -readFetchModeDefault btime getCurrentChain = do +readFetchModeDefault btime getCurrentChain + getUseBootstrapPeers getLedgerStateJudgement = do mCurSlot <- getCurrentSlot btime - case mCurSlot of - -- The current chain's tip far away from "now", so use bulk sync mode. - CurrentSlotUnknown -> return FetchModeBulkSync - CurrentSlot curSlot -> do + usingBootstrapPeers <- requiresBootstrapPeers <$> getUseBootstrapPeers + <*> getLedgerStateJudgement + + -- This logic means that when the node is using bootstrap peers and is in + -- TooOld state it will always return BulkSync. Otherwise if the node + -- isn't using bootstrap peers (i.e. has them disabled it will use the old + -- logic of returning BulkSync if behind 1000 slots + case (usingBootstrapPeers, mCurSlot) of + (True, _) -> return FetchModeBulkSync + (False, CurrentSlotUnknown) -> return FetchModeBulkSync + (False, CurrentSlot curSlot) -> do curChainSlot <- AF.headSlot <$> getCurrentChain let slotsBehind = case curChainSlot of -- There's nothing in the chain. If the current slot is 0, then From 00ff278c926a64decabe7d3ed3c4836c54ece635 Mon Sep 17 00:00:00 2001 From: Nicolas Frisby Date: Tue, 20 Feb 2024 12:45:17 -0800 Subject: [PATCH 7/7] Update changelogs --- ...240220_122811_nick.frisby_bootstrap_gsm.md | 34 +++++++++++++++++++ ...240220_122808_nick.frisby_bootstrap_gsm.md | 30 ++++++++++++++++ 2 files changed, 64 insertions(+) create mode 100644 ouroboros-consensus-diffusion/changelog.d/20240220_122811_nick.frisby_bootstrap_gsm.md create mode 100644 ouroboros-consensus/changelog.d/20240220_122808_nick.frisby_bootstrap_gsm.md diff --git a/ouroboros-consensus-diffusion/changelog.d/20240220_122811_nick.frisby_bootstrap_gsm.md b/ouroboros-consensus-diffusion/changelog.d/20240220_122811_nick.frisby_bootstrap_gsm.md new file mode 100644 index 0000000000..b09176169f --- /dev/null +++ b/ouroboros-consensus-diffusion/changelog.d/20240220_122811_nick.frisby_bootstrap_gsm.md @@ -0,0 +1,34 @@ + + + + +### Non-Breaking + +- Added `getImmTipSlot` to `NodeKernel` exports. + +### Breaking + +- Added the Genesis State Machine (GSM), though for now it is merely the + simpler [Bootstrap Peers State + Machine](https://ouroboros-consensus.cardano.intersectmbo.org/docs/for-developers/BootstrapPeersIER). + +- Added `rnGetUseBootstrapPeers` to `RunNodeArgs`, for dynamically + enabling/disabling the GSM. The proper GSM must always be running, despite + the TVar it owns being ignored when it's disabled, since it may be enabled at + any time. + +- Added `llrnMaxCaughtUpAge` to the low-level args; defaults to 20min. + +- Added `gsmTracer` to the node's tracers. + +- Added `getNodeIdlers` to the `NodeKernel` interface; tracking peers that have + last sent `MsgAwaitReply`. diff --git a/ouroboros-consensus/changelog.d/20240220_122808_nick.frisby_bootstrap_gsm.md b/ouroboros-consensus/changelog.d/20240220_122808_nick.frisby_bootstrap_gsm.md new file mode 100644 index 0000000000..a7c937290c --- /dev/null +++ b/ouroboros-consensus/changelog.d/20240220_122808_nick.frisby_bootstrap_gsm.md @@ -0,0 +1,30 @@ + + + + + +### Breaking + +- Added `cdbsHasFSGsmDB` to the ChainDB args, for the GSM's persistent marker + file. + +- Added arguments to `bracketChainSyncClient` and `ChainSync.DynamicEnv` for + tracking idling peers. + +- Added arguments to `readFetchModeDefault` for sensitivity to the GSM state: + when using bootstrap peers, simply mimic the GSM state. Otherwise, fall back + to the legacy logic.