Skip to content

Commit

Permalink
Update Splice from CCI (#39)
Browse files Browse the repository at this point in the history
Signed-off-by: DA Automation <[email protected]>
Co-authored-by: DA Automation <[email protected]>
  • Loading branch information
canton-network-da and DA Automation authored Sep 26, 2024
1 parent a9c8b64 commit e75f88e
Show file tree
Hide file tree
Showing 24 changed files with 517 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,17 @@ object ConfigTransforms {
)
)

def setGrpcDeadlineForTreasuryService(
grpcDeadline: Option[NonNegativeFiniteDuration]
): ConfigTransform =
ConfigTransforms.updateAllValidatorAppConfigs_(c =>
c.copy(treasury =
c.treasury.copy(
grpcDeadline = grpcDeadline
)
)
)

def updateAllValidatorAppConfigs(
update: (String, ValidatorAppBackendConfig) => ValidatorAppBackendConfig
): ConfigTransform =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,10 @@ class DecentralizedSynchronizerMigrationIntegrationTest
updateAutomationConfig(ConfigurableApp.Sv)(
_.withPausedTrigger[ReceiveSvRewardCouponTrigger]
)(conf),
(_, conf) =>
ConfigTransforms.setGrpcDeadlineForTreasuryService(
Some(NonNegativeFiniteDuration.ofSeconds(10))
)(conf),
)
.withManualStart
// TODO (#10859) remove and fix test failures
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,11 @@ class DockerComposeFullNetworkFrontendIntegrationTest
login(80, "administrator", "wallet.localhost"),
)(
"administrator is already onboarded",
_ => {
seleniumText(find(id("logged-in-user"))) should startWith(partyHint)
// Wait for some traffic to be bought before proceeding, so that we don't
// hit a "traffic below reserved amount" error
val txs = findAll(className("tx-row")).toSeq
val trafficPurchases = txs.filter { txRow =>
txRow.childElement(className("tx-action")).text.contains("Sent") &&
txRow.childElement(className("tx-subtype")).text.contains("Extra Traffic Purchase")
}
trafficPurchases should not be empty
},
_ => seleniumText(find(id("logged-in-user"))) should startWith(partyHint),
)
// Wait for some traffic to be bought before proceeding, so that we don't
// hit a "traffic below reserved amount" error
waitForTrafficPurchase()
go to "http://wallet.localhost"
actAndCheck(
"Login as alice",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,16 +83,9 @@ class DockerComposeValidatorFrontendIntegrationTest
login(80, "administrator", "wallet.localhost"),
)(
"administrator is already onboarded",
_ => {
seleniumText(find(id("logged-in-user"))) should startWith(partyHint)
val txs = findAll(className("tx-row")).toSeq
val trafficPurchases = txs.filter { txRow =>
txRow.childElement(className("tx-action")).text.contains("Sent") &&
txRow.childElement(className("tx-subtype")).text.contains("Extra Traffic Purchase")
}
trafficPurchases should not be empty
},
_ => seleniumText(find(id("logged-in-user"))) should startWith(partyHint),
)
waitForTrafficPurchase()
actAndCheck(
"Login as alice",
loginOnCurrentPage(80, "alice", "wallet.localhost"),
Expand Down Expand Up @@ -197,18 +190,11 @@ class DockerComposeValidatorFrontendIntegrationTest
login(80, "administrator", "wallet.localhost"),
)(
"administrator is already onboarded",
_ => {
seleniumText(find(id("logged-in-user"))) should startWith(partyHint)
// Wait for some traffic to be bought before proceeding, so that we don't
// hit a "traffic below reserved amount" error
val txs = findAll(className("tx-row")).toSeq
val trafficPurchases = txs.filter { txRow =>
txRow.childElement(className("tx-action")).text.contains("Sent") &&
txRow.childElement(className("tx-subtype")).text.contains("Extra Traffic Purchase")
}
trafficPurchases should not be empty
},
_ => seleniumText(find(id("logged-in-user"))) should startWith(partyHint),
)
// Wait for some traffic to be bought before proceeding, so that we don't
// hit a "traffic below reserved amount" error
waitForTrafficPurchase()
clue("Alice can onboard again") {
actAndCheck(
"Login as alice",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import com.daml.network.sv.automation.delegatebased.{
AdvanceOpenMiningRoundTrigger,
ExpireIssuingMiningRoundTrigger,
}
import com.daml.network.sv.automation.singlesv.LocalSequencerConnectionsTrigger
import com.digitalasset.canton.{BaseTest, DomainAlias, SequencerAlias}
import com.digitalasset.canton.config.NonNegativeFiniteDuration
import com.digitalasset.canton.config.RequireTypes.NonNegativeLong
Expand Down Expand Up @@ -208,10 +209,8 @@ class SoftDomainMigrationTopologySetupIntegrationTest
)
// TODO(#14419) Remove this once the retries cover all required errors
setTriggersWithin(triggersToPauseAtStart =
Seq(
aliceValidatorBackend.validatorAutomation.trigger[ReconcileSequencerConnectionsTrigger],
bobValidatorBackend.validatorAutomation.trigger[ReconcileSequencerConnectionsTrigger],
splitwellValidatorBackend.validatorAutomation.trigger[ReconcileSequencerConnectionsTrigger],
Seq(aliceValidatorBackend, bobValidatorBackend, splitwellValidatorBackend).map(
_.validatorAutomation.trigger[ReconcileSequencerConnectionsTrigger]
)
) {
clue("Setup splitwell") {
Expand Down Expand Up @@ -281,17 +280,24 @@ class SoftDomainMigrationTopologySetupIntegrationTest
},
)("amulet config vote request has been created", _ => sv1Backend.listVoteRequests().loneElement)

clue(s"sv2-4 accept amulet config vote request") {
Seq(sv2Backend, sv3Backend, sv4Backend).map(sv =>
eventuallySucceeds() {
sv.castVote(
voteRequest.contractId,
true,
"url",
"description",
)
}
// TODO(#8300) No need to pause once we can't get a timeout on a concurrent sequencer connection change anymore
setTriggersWithin(triggersToPauseAtStart =
Seq(sv2Backend, sv3Backend, sv4Backend).map(
_.dsoAutomation.trigger[LocalSequencerConnectionsTrigger]
)
) {
clue(s"sv2-4 accept amulet config vote request") {
Seq(sv2Backend, sv3Backend, sv4Backend).map(sv =>
eventuallySucceeds() {
sv.castVote(
voteRequest.contractId,
true,
"url",
"description",
)
}
)
}
}

eventually() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,19 @@ trait WalletFrontendTestUtil extends WalletTestUtil { self: FrontendTestCommon =
amountO.map(Tap(date, _))
}

protected def waitForTrafficPurchase()(implicit driver: WebDriverType) = {
clue("Waitig for a traffic purchase") {
eventually(1.minute) {
val txs = findAll(className("tx-row")).toSeq
val trafficPurchases = txs.filter { txRow =>
txRow.childElement(className("tx-action")).text.contains("Sent") &&
txRow.childElement(className("tx-subtype")).text.contains("Extra Traffic Purchase")
}
trafficPurchases should not be empty
}
}
}

private def readDateFromRow(transactionRow: Element): String =
transactionRow
.childElement(className("tx-row-cell-date"))
Expand Down
19 changes: 19 additions & 0 deletions apps/common/src/main/openapi/common-internal.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ components:
type: string

NodeKey:
oneOf:
- $ref: "#/components/schemas/KeyPair"
- $ref: "#/components/schemas/KmsKeyId"

KeyPair:
type: object
required:
- keyPair
Expand All @@ -91,6 +96,20 @@ components:
name:
type: string

KmsKeyId:
type: object
required:
- type
- keyId
properties:
type:
type: string
enum: [ "signing", "encryption" ]
keyId:
type: string
name:
type: string

ParticipantUser:
type: object
required:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,31 @@ final case class NodeIdentitiesDump(
version: Option[String],
) extends PrettyPrinting {
def toHttp: http.NodeIdentitiesDump = {
val httpKeys = keys.map {
case NodeIdentitiesDump.NodeKey.KeyPair(keyPair, name) =>
http.NodeKey.members.KeyPair(
http.KeyPair(Base64.getEncoder.encodeToString(keyPair.toArray), name)
): http.NodeKey
case NodeIdentitiesDump.NodeKey.KmsKeyId(
NodeIdentitiesDump.NodeKey.KeyType.Signing,
keyId,
name,
) =>
http.NodeKey.members.KmsKeyId(
http.KmsKeyId(http.KmsKeyId.Type.Signing, keyId, name)
): http.NodeKey
case NodeIdentitiesDump.NodeKey.KmsKeyId(
NodeIdentitiesDump.NodeKey.KeyType.Encryption,
keyId,
name,
) =>
http.NodeKey.members.KmsKeyId(
http.KmsKeyId(http.KmsKeyId.Type.Encryption, keyId, name)
): http.NodeKey
}.toVector
http.NodeIdentitiesDump(
id.toProtoPrimitive,
keys
.map(key => http.NodeKey(Base64.getEncoder.encodeToString(key.keyPair.toArray), key.name))
.toVector,
httpKeys,
Base64.getEncoder.encodeToString(authorizedStoreSnapshot.toByteArray),
version,
)
Expand Down Expand Up @@ -55,8 +75,18 @@ object NodeIdentitiesDump {
Try(
NodeIdentitiesDump(
id = id(response.id),
keys =
response.keys.toSeq.map(k => NodeKey(Base64.getDecoder.decode(k.keyPair).toSeq, k.name)),
keys = response.keys.toSeq.map {
case http.NodeKey.members.KmsKeyId(
http.KmsKeyId(http.KmsKeyId.Type.members.Signing, keyId, name)
) =>
NodeKey.KmsKeyId(NodeKey.KeyType.Signing, keyId, name)
case http.NodeKey.members.KmsKeyId(
http.KmsKeyId(http.KmsKeyId.Type.members.Encryption, keyId, name)
) =>
NodeKey.KmsKeyId(NodeKey.KeyType.Encryption, keyId, name)
case http.NodeKey.members.KeyPair(http.KeyPair(keyPair, name)) =>
NodeKey.KeyPair(Base64.getDecoder.decode(keyPair).toSeq, name)
},
authorizedStoreSnapshot =
ByteString.copyFrom(Base64.getDecoder.decode(response.authorizedStoreSnapshot)),
version = response.version,
Expand All @@ -79,8 +109,26 @@ object NodeIdentitiesDump {
.flatMap(fromHttp(id, _))
}

final case class NodeKey(
keyPair: Seq[Byte],
name: Option[String],
)
sealed trait NodeKey {
def name: Option[String]
}

object NodeKey {
sealed trait KeyType
object KeyType {
case object Signing extends KeyType
case object Encryption extends KeyType
}

final case class KeyPair(
keyPair: Seq[Byte],
name: Option[String],
) extends NodeKey

final case class KmsKeyId(
keyType: KeyType,
keyId: String,
name: Option[String],
) extends NodeKey
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,12 @@ class NodeIdentitiesStore(
for {
id <- adminConnection.identity()
keysMetadata <- adminConnection.listMyKeys()
// TODO(#14916): We should support key ids when we consume the change (DACH-NY/canton#21429) from Canton
keys <- keysMetadata.traverse(keyM =>
adminConnection
.exportKeyPair(keyM.publicKeyWithName.publicKey.id)
.map(keyBytes =>
NodeIdentitiesDump.NodeKey(
NodeIdentitiesDump.NodeKey.KeyPair(
keyBytes.toByteArray.toSeq,
keyM.publicKeyWithName.name.map(_.unwrap),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,14 @@ class NodeInitializer(
s"Uploading node keys ${dump.keys.map(_.name)} from dump for id ${dump.id}, new node id: $expectedId"
)
// this is idempotent
dump.keys.traverse_(key => connection.importKeyPair(key.keyPair.toArray, key.name))
dump.keys.traverse_ {
case NodeIdentitiesDump.NodeKey.KeyPair(keyPair, name) =>
connection.importKeyPair(keyPair.toArray, name)
case NodeIdentitiesDump.NodeKey.KmsKeyId(_, _, _) =>
// it is not possible for now unless someone manually creates dump with key ids
// TODO(#14916): We should support key ids when we consume the change (DACH-NY/canton#21429) from Canton
throw new UnsupportedOperationException("KMS keys are not supported")
}
}

private def importAuthorizedStoreSnapshot(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import com.digitalasset.canton.tracing.TraceContext
import slick.jdbc.canton.ActionBasedSQLInterpolation.Implicits.actionBasedSQLInterpolationCanton
import slick.jdbc.{GetResult, JdbcProfile}

import java.util.concurrent.Semaphore
import scala.concurrent.{ExecutionContext, Future}

class AcsSnapshotStore(
Expand Down Expand Up @@ -62,20 +63,26 @@ class AcsSnapshotStore(
)(implicit
tc: TraceContext
): Future[Int] = {
val from = lastSnapshot.map(_.snapshotRecordTime).getOrElse(CantonTimestamp.MinValue)
val previousSnapshotDataFiler = lastSnapshot match {
case Some(AcsSnapshot(_, _, _, firstRowId, lastRowId)) =>
sql"where snapshot.row_id >= $firstRowId and snapshot.row_id <= $lastRowId"
case None =>
sql"where false"
}
storage.update(
(sql"""
Future {
scala.concurrent.blocking {
AcsSnapshotStore.PreventConcurrentSnapshotsSemaphore.acquire()
}
}.flatMap { _ =>
val from = lastSnapshot.map(_.snapshotRecordTime).getOrElse(CantonTimestamp.MinValue)
val previousSnapshotDataFiler = lastSnapshot match {
case Some(AcsSnapshot(_, _, _, firstRowId, lastRowId)) =>
sql"where snapshot.row_id >= $firstRowId and snapshot.row_id <= $lastRowId"
case None =>
sql"where false"
}
storage.update(
(sql"""
with inserted_rows as (
with previous_snapshot_data as (select contract_id
from acs_snapshot_data snapshot
join update_history_creates creates on snapshot.create_id = creates.row_id
""" ++ previousSnapshotDataFiler ++ sql"""),
""" ++ previousSnapshotDataFiler ++
sql"""),
transactions_in_snapshot as not materialized ( -- materialized yields a worse plan
select row_id
from update_history_transactions txs
Expand Down Expand Up @@ -126,8 +133,11 @@ class AcsSnapshotStore(
from inserted_rows
having min(row_id) is not null;
""").toActionBuilder.asUpdate,
"insertNewSnapshot",
)
"insertNewSnapshot",
)
}.andThen { _ =>
AcsSnapshotStore.PreventConcurrentSnapshotsSemaphore.release()
}
}

def queryAcsSnapshot(
Expand Down Expand Up @@ -295,6 +305,9 @@ class AcsSnapshotStore(

object AcsSnapshotStore {

// Only relevant for tests, in production this is already guaranteed.
private val PreventConcurrentSnapshotsSemaphore = new Semaphore(1)

case class AcsSnapshot(
snapshotRecordTime: CantonTimestamp,
migrationId: Long,
Expand Down
Loading

0 comments on commit e75f88e

Please sign in to comment.