Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add database tracking and report for Push Reliability #769

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions autoconnect/autoconnect-common/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ pub struct ClientAck {
#[serde(rename = "channelID")]
pub channel_id: Uuid,
pub version: String,
#[serde(default)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps a comment - and potentially comments for channel_id and version, even if it's self evident - to describe purpose of the reliability_id

pub reliability_id: Option<String>,
}

#[derive(Debug, Serialize)]
Expand Down
1 change: 1 addition & 0 deletions autoconnect/autoconnect-settings/src/app_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ impl AppState {
db_settings: settings.db_settings.clone(),
};
let storage_type = StorageType::from_dsn(&db_settings.dsn);

#[allow(unused)]
let db: Box<dyn DbClient> = match storage_type {
#[cfg(feature = "bigtable")]
Expand Down
2 changes: 2 additions & 0 deletions autoconnect/autoconnect-web/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,5 @@ ctor.workspace = true
tokio.workspace = true

autoconnect_common = { workspace = true, features = ["test-support"] }

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these features will be used in the next PR.

[features]
2 changes: 2 additions & 0 deletions autoconnect/autoconnect-ws/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,5 @@ async-stream = "0.3"
ctor.workspace = true

autoconnect_common = { workspace = true, features = ["test-support"] }

[features]
2 changes: 2 additions & 0 deletions autoconnect/autoconnect-ws/autoconnect-ws-sm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,5 @@ tokio.workspace = true
serde_json.workspace = true

autoconnect_common = { workspace = true, features = ["test-support"] }

[features]
7 changes: 7 additions & 0 deletions autoendpoint/src/extractors/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ impl From<Notification> for autopush_common::notification::Notification {
timestamp: notification.timestamp,
data: notification.data,
sortkey_timestamp,
reliability_id: notification.subscription.reliability_id,
headers: {
let headers: HashMap<String, String> = notification.headers.into();
if headers.is_empty() {
Expand Down Expand Up @@ -171,6 +172,12 @@ impl Notification {
map.insert("ttl", serde_json::to_value(self.headers.ttl).unwrap());
map.insert("topic", serde_json::to_value(&self.headers.topic).unwrap());
map.insert("timestamp", serde_json::to_value(self.timestamp).unwrap());
if let Some(reliability_id) = &self.subscription.reliability_id {
map.insert(
"reliability_id",
serde_json::to_value(reliability_id).unwrap(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the signature returns HashMap<&'static str, serde_json::Value> regardless of success/fail with the .unwrap() calls. Did we want to simplify this with the use of ? in a few places? Here's my thought:

Eg.

/// Serialize the notification for delivery to the connection server.
pub fn serialize_for_delivery(&self) -> Result<HashMap<&'static str, serde_json::Value>, serde_json::Error> {
    let mut map = HashMap::new();
    
    map.insert(
        "channelID",
        serde_json::to_value(self.subscription.channel_id)?
    );
    map.insert("version", serde_json::to_value(&self.message_id)?);
    map.insert("ttl", serde_json::to_value(self.headers.ttl)?);
    map.insert("topic", serde_json::to_value(&self.headers.topic)?);
    map.insert("timestamp", serde_json::to_value(self.timestamp)?);

    if let Some(reliability_id) = &self.subscription.reliability_id {
        map.insert("reliability_id", serde_json::to_value(reliability_id)?);
    }

    if let Some(data) = &self.data {
        map.insert("data", serde_json::to_value(data)?);

        let headers: HashMap<_, _> = self.headers.clone().into();
        map.insert("headers", serde_json::to_value(headers)?);
    }

    Ok(map)
}

Therefore the single call to this function in autoendpoint/src/routers/webpush.rs has a surrounding context to returning a Result type, so it'd probably be minimal adjustment with just calling a single .unwrap()

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair. I didn't want to modify a bunch of extra things, but I think you're right. I'll update ApiErrorKind

);
}

if let Some(data) = &self.data {
map.insert("data", serde_json::to_value(data).unwrap());
Expand Down
19 changes: 9 additions & 10 deletions autoendpoint/src/extractors/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub struct Subscription {
/// (This should ONLY be applied for messages that match known
/// Mozilla provided VAPID public keys.)
///
pub tracking_id: Option<String>,
pub reliability_id: Option<String>,
}

impl FromRequest for Subscription {
Expand Down Expand Up @@ -75,11 +75,13 @@ impl FromRequest for Subscription {
.transpose()?;

trace!("raw vapid: {:?}", &vapid);
let trackable = if let Some(vapid) = &vapid {
app_state.reliability.is_trackable(vapid)
} else {
false
};
let reliability_id: Option<String> = vapid.clone().and_then(|v| {
app_state
.vapid_tracker
.is_trackable(&v)
.then(|| app_state.vapid_tracker.get_id(req.headers()))
});
debug!("🔍 Assigning Reliability: {:?}", reliability_id);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
debug!("🔍 Assigning Reliability: {:?}", reliability_id);
debug!("🔍 Assigning Reliability ID: {:?}", reliability_id);


// Capturing the vapid sub right now will cause too much cardinality. Instead,
// let's just capture if we have a valid VAPID, as well as what sort of bad sub
Expand Down Expand Up @@ -134,14 +136,11 @@ impl FromRequest for Subscription {
.incr(&format!("updates.vapid.draft{:02}", vapid.vapid.version()))?;
}

let tracking_id =
trackable.then(|| app_state.reliability.get_tracking_id(req.headers()));

Ok(Subscription {
user,
channel_id,
vapid,
tracking_id,
reliability_id,
})
}
.boxed_local()
Expand Down
8 changes: 7 additions & 1 deletion autoendpoint/src/routers/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ pub fn build_message_data(notification: &Notification) -> ApiResult<HashMap<&'st
message_data.insert_opt("enc", notification.headers.encryption.as_ref());
message_data.insert_opt("cryptokey", notification.headers.crypto_key.as_ref());
message_data.insert_opt("enckey", notification.headers.encryption_key.as_ref());
// Report the data to the UA. How this value is reported back is still a work in progress.
trace!(
"🔍 Sending Reliability: {:?}",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For clarity, I'd update this to

Suggested change
"🔍 Sending Reliability: {:?}",
"🔍 Sending Reliability ID: {:?}",

and perhaps the other instances this is logged

notification.subscription.reliability_id
);
message_data.insert_opt("rid", notification.subscription.reliability_id.as_ref());
}

Ok(message_data)
Expand Down Expand Up @@ -239,7 +245,7 @@ pub mod tests {
user,
channel_id: channel_id(),
vapid: None,
tracking_id: None,
reliability_id: None,
},
headers: NotificationHeaders {
ttl: 0,
Expand Down
6 changes: 3 additions & 3 deletions autoendpoint/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub struct AppState {
pub apns_router: Arc<ApnsRouter>,
#[cfg(feature = "stub")]
pub stub_router: Arc<StubRouter>,
pub reliability: Arc<VapidTracker>,
pub vapid_tracker: Arc<VapidTracker>,
}

pub struct Server;
Expand Down Expand Up @@ -109,7 +109,7 @@ impl Server {
)
.await?,
);
let reliability = Arc::new(VapidTracker(settings.tracking_keys()));
let vapid_tracker = Arc::new(VapidTracker(settings.tracking_keys()));
#[cfg(feature = "stub")]
let stub_router = Arc::new(StubRouter::new(settings.stub.clone())?);
let app_state = AppState {
Expand All @@ -122,7 +122,7 @@ impl Server {
apns_router,
#[cfg(feature = "stub")]
stub_router,
reliability,
vapid_tracker,
};

spawn_pool_periodic_reporter(
Expand Down
31 changes: 21 additions & 10 deletions autoendpoint/src/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,11 @@ impl Settings {
// public key, but that may not always be true.
pub fn tracking_keys(&self) -> Vec<String> {
let keys = &self.tracking_keys.replace(['"', ' '], "");
Self::read_list_from_str(keys, "Invalid AUTOEND_TRACKING_KEYS")
.map(|v| v.to_owned())
.collect()
let reply = Self::read_list_from_str(keys, "Invalid AUTOEND_TRACKING_KEYS")
.map(|v| v.to_owned().replace("=", ""))
.collect();
trace!("🔍 keys: {:?}", reply);
reply
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor nit, but thought result is better than reply since it's not calling out to anywhere? And making the trace output linked be a little more explicit, given keys might be a bit vague.

Suggested change
let reply = Self::read_list_from_str(keys, "Invalid AUTOEND_TRACKING_KEYS")
.map(|v| v.to_owned().replace("=", ""))
.collect();
trace!("🔍 keys: {:?}", reply);
reply
let result = Self::read_list_from_str(keys, "Invalid AUTOEND_TRACKING_KEYS")
.map(|v| v.to_owned().replace("=", ""))
.collect();
trace!("🔍 tracking_keys: {:?}", reply);
result

}

/// Get the URL for this endpoint server
Expand All @@ -199,11 +201,20 @@ impl VapidTracker {
pub fn is_trackable(&self, vapid: &VapidHeaderWithKey) -> bool {
// ideally, [Settings.with_env_and_config_file()] does the work of pre-populating
// the Settings.tracking_vapid_pubs cache, but we can't rely on that.
self.0.contains(&vapid.public_key)
let key = vapid.public_key.replace('=', "");
let result = self.0.contains(&key);
debug!("🔍 Checking {key} {}", {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super useful, good idea 👍

if result {
"Match!"
} else {
"no match"
}
});
result
}

/// Extract the message Id from the headers (if present), otherwise just make one up.
pub fn get_tracking_id(&self, headers: &HeaderMap) -> String {
pub fn get_id(&self, headers: &HeaderMap) -> String {
headers
.get("X-MessageId")
.and_then(|v|
Expand Down Expand Up @@ -310,7 +321,7 @@ mod tests {
#[test]
fn test_tracking_keys() -> ApiResult<()> {
let settings = Settings{
tracking_keys: r#"["BLMymkOqvT6OZ1o9etCqV4jGPkvOXNz5FdBjsAR9zR5oeCV1x5CBKuSLTlHon-H_boHTzMtMoNHsAGDlDB6X7vI"]"#.to_owned(),
tracking_keys: r#"["BLMymkOqvT6OZ1o9etCqV4jGPkvOXNz5FdBjsAR9zR5oeCV1x5CBKuSLTlHon-H_boHTzMtMoNHsAGDlDB6X7"]"#.to_owned(),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modified to check for padding stripping.

Copy link
Contributor

@taddes taddes Oct 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this one have the "==" added to end as well, with exclusion of vI?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we were decoding the key from base64, yes. We're currently not doing that so just tweaking the string values to ensure that they match (with and without the padding) should be fine.

If we ever decide to decode these strings and do a byte comparison of the decoded pairs, then we would have to revisit this test, but that's out of scope for this PR.

..Default::default()
};

Expand All @@ -320,7 +331,7 @@ mod tests {
token: "".to_owned(),
version_data: crate::headers::vapid::VapidVersionData::Version1,
},
public_key: "BLMymkOqvT6OZ1o9etCqV4jGPkvOXNz5FdBjsAR9zR5oeCV1x5CBKuSLTlHon-H_boHTzMtMoNHsAGDlDB6X7vI".to_owned()
public_key: "BLMymkOqvT6OZ1o9etCqV4jGPkvOXNz5FdBjsAR9zR5oeCV1x5CBKuSLTlHon-H_boHTzMtMoNHsAGDlDB6X7==".to_owned()
};

let key_set = settings.tracking_keys();
Expand All @@ -333,20 +344,20 @@ mod tests {
}

#[test]
fn test_tracking_id() -> ApiResult<()> {
fn test_reliability_id() -> ApiResult<()> {
let mut headers = HeaderMap::new();
let keys = Vec::new();
let reliability = VapidTracker(keys);

let key = reliability.get_tracking_id(&headers);
let key = reliability.get_id(&headers);
assert!(!key.is_empty());

headers.insert(
HeaderName::from_lowercase(b"x-messageid").unwrap(),
HeaderValue::from_static("123foobar456"),
);

let key = reliability.get_tracking_id(&headers);
let key = reliability.get_id(&headers);
assert_eq!(key, "123foobar456".to_owned());

Ok(())
Expand Down
2 changes: 1 addition & 1 deletion autopush-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ form_urlencoded = { version = "1.2", optional = true }
[dev-dependencies]
mockito = "0.31"
tempfile = "3.2.0"
tokio = { workspace=true, features = ["macros"] }
tokio = { workspace = true, features = ["macros"] }
actix-rt = "2.8"

[features]
Expand Down
16 changes: 16 additions & 0 deletions autopush-common/src/db/bigtable/bigtable_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -721,6 +721,7 @@ impl BigTableClientImpl {
)
})?;

// Create from the known, required fields.
let mut notif = Notification {
channel_id: range_key.channel_id,
topic: range_key.topic,
Expand All @@ -731,6 +732,7 @@ impl BigTableClientImpl {
..Default::default()
};

// Backfill the Optional fields
if let Some(cell) = row.take_cell("data") {
notif.data = Some(to_string(cell.value, "data")?);
}
Expand All @@ -740,6 +742,10 @@ impl BigTableClientImpl {
.map_err(|e| DbError::Serialization(e.to_string()))?,
);
}
if let Some(cell) = row.take_cell("reliability_id") {
trace!("🚣 Is reliable");
notif.reliability_id = Some(to_string(cell.value, "reliability_id")?);
}

trace!("🚣 Deserialized message row: {:?}", &notif);
Ok(notif)
Expand Down Expand Up @@ -1171,6 +1177,15 @@ impl DbClient for BigTableClientImpl {
..Default::default()
});
}

if let Some(reliability_id) = message.reliability_id {
cells.push(cell::Cell {
qualifier: "reliability_id".to_owned(),
value: reliability_id.into_bytes(),
timestamp: expiry,
..Default::default()
});
}
row.add_cells(family, cells);
trace!("🉑 Adding row");
self.write_row(row).await?;
Expand Down Expand Up @@ -1287,6 +1302,7 @@ impl DbClient for BigTableClientImpl {
);

let messages = self.rows_to_notifications(rows)?;

// Note: Bigtable always returns a timestamp of None.
// Under Bigtable `current_timestamp` is instead initially read
// from [get_user].
Expand Down
5 changes: 5 additions & 0 deletions autopush-common/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,10 @@ pub struct NotificationRecord {
/// value before sending it to storage or a connection node.
#[serde(skip_serializing_if = "Option::is_none")]
updateid: Option<String>,
/// Internal Push Reliability tracking id. (Applied only to subscription updates generated
/// by Mozilla owned and consumed messages, like SendTab updates.)
#[serde(skip_serializing_if = "Option::is_none")]
reliability_id: Option<String>,
}

impl NotificationRecord {
Expand Down Expand Up @@ -337,6 +341,7 @@ impl NotificationRecord {
data: self.data,
headers: self.headers.map(|m| m.into()),
sortkey_timestamp: key.sortkey_timestamp,
reliability_id: None,
})
}

Expand Down
2 changes: 2 additions & 0 deletions autopush-common/src/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ pub struct Notification {
pub sortkey_timestamp: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub headers: Option<HashMap<String, String>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub reliability_id: Option<String>,
}

pub const TOPIC_NOTIFICATION_PREFIX: &str = "01";
Expand Down
2 changes: 1 addition & 1 deletion scripts/convert_pem_to_x962.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
Autopush will try to scan for known VAPID public keys to track. These keys
are specified in the header as x962 formatted strings. X962 is effectively
"raw" format and contains the two longs that are the coordinates for the
public key.
public key prefixed with a '\04` byte.

"""
import base64
Expand Down
Loading