Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Randomized ProtocolTimeLimits #4980

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
Open
10 changes: 10 additions & 0 deletions cardano-client/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,16 @@

## next version

### Breaking changes

* Reimplemntation of `subscribe` without relaying on non-p2p stack. Its
arguments have changed. Note that the `NodeToClientProtocols` and
`OuroborosApplicationWithMinimalCtx` specify `Void` as return type of the
responder side.
* The default reconnect delay was increased from `0.025s` to `5s`.

### Non-breaking changes

## 0.3.1.5 -- 2024-08-27

### Breaking changes
Expand Down
3 changes: 3 additions & 0 deletions cardano-client/cardano-client.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@ library
build-depends:
base >=4.14 && <4.21,
bytestring >=0.10 && <0.13,
cborg,
containers,
contra-tracer,
network-mux ^>=0.4.5,
ouroboros-network >=0.9 && <0.18,
ouroboros-network-api >=0.5.2 && <0.10,
ouroboros-network-framework >=0.8 && <0.14,
si-timers,

ghc-options:
-Wall
Expand Down
142 changes: 109 additions & 33 deletions cardano-client/src/Cardano/Client/Subscription.hs
Original file line number Diff line number Diff line change
@@ -1,24 +1,40 @@
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeFamilies #-}

module Cardano.Client.Subscription
( subscribe
( -- * Subscription API
subscribe
, SubscriptionParams (..)
, SubscriptionTracers (..)
, SubscriptionTrace (..)
-- * Re-exports
-- ** Mux
, MuxMode (..)
, ConnectionId
, LocalAddress
, MuxTrace
, WithMuxBearer
-- ** Connections
, ConnectionId (..)
, LocalAddress (..)
-- ** Protocol API
, NodeToClientProtocols (..)
, MiniProtocolCb (..)
, MuxTrace
, RunMiniProtocol (..)
, WithMuxBearer
, ControlMessage (..)
) where

import Codec.CBOR.Term qualified as CBOR
import Control.Exception
import Control.Monad (join)
import Control.Monad.Class.MonadTime.SI
import Control.Monad.Class.MonadTimer.SI
import Control.Tracer (Tracer, traceWith)
import Data.ByteString.Lazy qualified as BSL
import Data.Map.Strict (Map)
import Data.Map.Strict qualified as Map
import Data.Maybe (fromMaybe)
import Data.Void (Void)

import Network.Mux.Trace (MuxTrace, WithMuxBearer)
Expand All @@ -27,15 +43,43 @@ import Ouroboros.Network.ControlMessage (ControlMessage (..))
import Ouroboros.Network.Magic (NetworkMagic)
import Ouroboros.Network.Mux (MiniProtocolCb (..), MuxMode (..),
OuroborosApplicationWithMinimalCtx, RunMiniProtocol (..))
import Ouroboros.Network.NodeToClient (ClientSubscriptionParams (..),
ConnectionId, LocalAddress, NetworkClientSubcriptionTracers,
NodeToClientProtocols (..), NodeToClientVersion,
NodeToClientVersionData (NodeToClientVersionData),
ncSubscriptionWorker, newNetworkMutableState,
versionedNodeToClientProtocols)
import Ouroboros.Network.Protocol.Handshake.Version (Versions, foldMapVersions)

import Ouroboros.Network.ConnectionId (ConnectionId (..))
import Ouroboros.Network.NodeToClient (Handshake, LocalAddress (..),
NetworkConnectTracers (..), NodeToClientProtocols,
NodeToClientVersion, NodeToClientVersionData (..), TraceSendRecv,
Versions)
import Ouroboros.Network.NodeToClient qualified as NtC
import Ouroboros.Network.Snocket qualified as Snocket

data SubscriptionParams a = SubscriptionParams
{ spAddress :: !LocalAddress
-- ^ unix socket or named pipe address
, spReconnectionDelay :: !(Maybe DiffTime)
-- ^ delay between connection attempts. The default value is `5s`.
, spCompleteCb :: Either SomeException a -> Decision
}

data Decision =
Abort
-- ^ abort subscription loop
| Reconnect
-- ^ reconnect

data SubscriptionTracers = SubscriptionTracers {
stMuxTracer :: Tracer IO (WithMuxBearer (ConnectionId LocalAddress) MuxTrace),
-- ^ low level mux-network tracer, which logs mux sdu (send and received)
-- and other low level multiplexing events.
stHandshakeTracer :: Tracer IO (WithMuxBearer (ConnectionId LocalAddress)
(TraceSendRecv (Handshake NodeToClientVersion CBOR.Term))),
-- ^ handshake protocol tracer; it is important for analysing version
-- negotation mismatches.
stSubscriptionTracer :: Tracer IO SubscriptionTrace
}

data SubscriptionTrace = SubscriptionError SomeException
deriving Show

-- | Subscribe using `node-to-client` mini-protocol.
--
-- 'blockVersion' ought to be instantiated with `BlockNodeToClientVersion blk`.
Expand All @@ -44,34 +88,63 @@ import Ouroboros.Network.Snocket qualified as Snocket
-- `Ouroboros.Consensus.Network.NodeToClient.clientCodecs`.
--
subscribe
:: forall blockVersion x y.
:: forall blockVersion a.
Snocket.LocalSnocket
-> NetworkMagic
-> Map NodeToClientVersion blockVersion
-- ^ Use `supportedNodeToClientVersions` from `ouroboros-consensus`.
-> NetworkClientSubcriptionTracers
-> ClientSubscriptionParams ()
-> SubscriptionTracers
-> SubscriptionParams a
-> ( NodeToClientVersion
-> blockVersion
-> NodeToClientProtocols 'InitiatorMode LocalAddress BSL.ByteString IO x y)
-> IO Void
subscribe snocket networkMagic supportedVersions tracers subscriptionParams protocols = do
networkState <- newNetworkMutableState
ncSubscriptionWorker
snocket
tracers
networkState
subscriptionParams
(versionedProtocols networkMagic supportedVersions protocols)
-> NodeToClientProtocols 'InitiatorMode LocalAddress BSL.ByteString IO a Void)
-> IO ()
subscribe snocket networkMagic supportedVersions
SubscriptionTracers {
stMuxTracer = muxTracer,
stHandshakeTracer = handshakeTracer,
stSubscriptionTracer = tracer
}
SubscriptionParams {
spAddress = addr,
spReconnectionDelay = reConnDelay,
spCompleteCb = completeCb
}
protocols =
mask $ \unmask ->
loop unmask $
NtC.connectTo
snocket
NetworkConnectTracers {
nctMuxTracer = muxTracer,
nctHandshakeTracer = handshakeTracer
}
(versionedProtocols networkMagic supportedVersions protocols)
(getFilePath addr)
where
loop :: (forall x. IO x -> IO x) -> IO (Either SomeException a) -> IO ()
loop unmask act = do
r <- fn <$> try (unmask act)
case r of
Right _ -> pure ()
Left e -> traceWith tracer (SubscriptionError e)
case completeCb r of
Abort -> pure ()
Reconnect -> do
threadDelay (fromMaybe 5 reConnDelay)
loop unmask act

fn :: forall x y. Either x (Either x y) -> Either x y
fn = join

versionedProtocols ::
forall m appType bytes blockVersion a b.
forall m appType bytes blockVersion a.
NetworkMagic
-> Map NodeToClientVersion blockVersion
-- ^ Use `supportedNodeToClientVersions` from `ouroboros-consensus`.
-> ( NodeToClientVersion
-> blockVersion
-> NodeToClientProtocols appType LocalAddress bytes m a b)
-> NodeToClientProtocols appType LocalAddress bytes m a Void)
-- ^ callback which receives codecs, connection id and STM action which
-- can be checked if the networking runtime system requests the protocols
-- to stop.
Expand All @@ -82,18 +155,21 @@ versionedProtocols ::
-> Versions
NodeToClientVersion
NodeToClientVersionData
(OuroborosApplicationWithMinimalCtx appType LocalAddress bytes m a b)
(OuroborosApplicationWithMinimalCtx appType LocalAddress bytes m a Void)
versionedProtocols networkMagic supportedVersions callback =
foldMapVersions applyVersion $ Map.toList supportedVersions
NtC.foldMapVersions applyVersion (Map.toList supportedVersions)
where
applyVersion
:: (NodeToClientVersion, blockVersion)
-> Versions
NodeToClientVersion
NodeToClientVersionData
(OuroborosApplicationWithMinimalCtx appType LocalAddress bytes m a b)
(OuroborosApplicationWithMinimalCtx appType LocalAddress bytes m a Void)
applyVersion (version, blockVersion) =
versionedNodeToClientProtocols
NtC.versionedNodeToClientProtocols
version
(NodeToClientVersionData networkMagic False)
NodeToClientVersionData {
networkMagic,
query = False
}
(callback version blockVersion)
10 changes: 10 additions & 0 deletions network-mux/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,16 @@

### Breaking changes

* Removed `Netowrk.Mux.Compat` module with legacy API.
* `Ouroboros.Network.Mux.toApplication` was removed.
* `Ouroboros.Network.Mux.mkMiniProtocolBundle` was renamed to
`mkMiniProtocolInfos`, its type changed.
* Removed `MiniProtocolBundle` newtype wrapper.
* Generalised `Channel` type and provide `ByteChannel` type alias.
* Provide additional APIs in the `Network.Mux.Channel` for creating channels
and byte channels.
* `MuxBearer` has a `name` field.

### Non-breaking changes

* Fix compilation with `tracetcpinfo` flag.
Expand Down
10 changes: 4 additions & 6 deletions network-mux/demo/mux-demo.hs
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,8 @@ serverWorker bearer = do

runMux nullTracer mux bearer
where
ptcls :: MiniProtocolBundle ResponderMode
ptcls = MiniProtocolBundle
[ MiniProtocolInfo {
ptcls :: [MiniProtocolInfo ResponderMode]
ptcls = [ MiniProtocolInfo {
miniProtocolNum = MiniProtocolNum 2,
miniProtocolDir = ResponderDirectionOnly,
miniProtocolLimits = defaultProtocolLimits
Expand Down Expand Up @@ -195,9 +194,8 @@ clientWorker bearer n msg = do

runMux nullTracer mux bearer
where
ptcls :: MiniProtocolBundle InitiatorMode
ptcls = MiniProtocolBundle
[ MiniProtocolInfo {
ptcls :: [MiniProtocolInfo InitiatorMode]
ptcls = [ MiniProtocolInfo {
miniProtocolNum = MiniProtocolNum 2,
miniProtocolDir = InitiatorDirectionOnly,
miniProtocolLimits = defaultProtocolLimits
Expand Down
1 change: 0 additions & 1 deletion network-mux/network-mux.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ library
Network.Mux.Bearer.Socket
Network.Mux.Channel
Network.Mux.Codec
Network.Mux.Compat
Network.Mux.DeltaQ.TraceStats
Network.Mux.DeltaQ.TraceStatsSupport
Network.Mux.DeltaQ.TraceTransformer
Expand Down
Loading
Loading