Skip to content

Commit

Permalink
feat(user_attachment): user attachment implementation (#41)
Browse files Browse the repository at this point in the history
  • Loading branch information
DariusIMP authored Feb 23, 2024
1 parent 450a01e commit 97b2a5c
Show file tree
Hide file tree
Showing 25 changed files with 830 additions and 87 deletions.
6 changes: 4 additions & 2 deletions zenoh-java/src/commonMain/kotlin/io/zenoh/Session.kt
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import io.zenoh.publication.Put
import io.zenoh.query.*
import io.zenoh.queryable.Query
import io.zenoh.queryable.Queryable
import io.zenoh.sample.Attachment
import io.zenoh.sample.Sample
import io.zenoh.selector.Selector
import io.zenoh.subscriber.Reliability
Expand Down Expand Up @@ -387,12 +388,13 @@ class Session private constructor(private val config: Config) : AutoCloseable {
timeout: Duration,
target: QueryTarget,
consolidation: ConsolidationMode,
value: Value?
value: Value?,
attachment: Attachment?,
): R? {
if (jniSession == null) {
throw sessionClosedException
}
return jniSession?.performGet(selector, callback, onClose, receiver, timeout, target, consolidation, value)
return jniSession?.performGet(selector, callback, onClose, receiver, timeout, target, consolidation, value, attachment)
}

@Throws(ZenohException::class)
Expand Down
42 changes: 33 additions & 9 deletions zenoh-java/src/commonMain/kotlin/io/zenoh/jni/JNIPublisher.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import io.zenoh.exceptions.ZenohException
import io.zenoh.prelude.SampleKind
import io.zenoh.publication.CongestionControl
import io.zenoh.publication.Priority
import io.zenoh.sample.Attachment
import io.zenoh.value.Value

/**
Expand All @@ -32,20 +33,39 @@ internal class JNIPublisher(private val ptr: Long) {
* Put value through the publisher.
*
* @param value The [Value] to be put.
* @param attachment Optional [Attachment].
*/
@Throws(ZenohException::class)
fun put(value: Value) {
putViaJNI(value.payload, value.encoding.knownEncoding.ordinal, ptr)
fun put(value: Value, attachment: Attachment?) {
putViaJNI(value.payload, value.encoding.knownEncoding.ordinal, attachment?.let { encodeAttachment(it) }, ptr)
}

/**
* Write operation.
*
* @param kind The [SampleKind].
* @param value The [Value] to be written.
* @param attachment Optional [Attachment].
*/
@Throws(ZenohException::class)
fun write(kind: SampleKind, value: Value) {
writeViaJNI(value.payload, value.encoding.knownEncoding.ordinal, kind.ordinal, ptr)
fun write(kind: SampleKind, value: Value, attachment: Attachment?) {
writeViaJNI(
value.payload,
value.encoding.knownEncoding.ordinal,
kind.ordinal,
attachment?.let { encodeAttachment(it) },
ptr
)
}

/**
* Delete operation.
*
* @param attachment Optional [Attachment].
*/
@Throws(ZenohException::class)
fun delete() {
deleteViaJNI(ptr)
fun delete(attachment: Attachment?) {
deleteViaJNI(attachment?.let { encodeAttachment(it) }, ptr)
}

/**
Expand Down Expand Up @@ -106,13 +126,17 @@ internal class JNIPublisher(private val ptr: Long) {

/** Puts through the native Publisher. */
@Throws(ZenohException::class)
private external fun putViaJNI(valuePayload: ByteArray, valueEncoding: Int, ptr: Long)
private external fun putViaJNI(
valuePayload: ByteArray, valueEncoding: Int, encodedAttachment: ByteArray?, ptr: Long
)

@Throws(ZenohException::class)
private external fun writeViaJNI(payload: ByteArray, encoding: Int, sampleKind: Int, ptr: Long)
private external fun writeViaJNI(
payload: ByteArray, encoding: Int, sampleKind: Int, encodedAttachment: ByteArray?, ptr: Long
)

@Throws(ZenohException::class)
private external fun deleteViaJNI(ptr: Long)
private external fun deleteViaJNI(encodedAttachment: ByteArray?, ptr: Long)

/** Frees the underlying native Publisher. */
private external fun freePtrViaJNI(ptr: Long)
Expand Down
4 changes: 3 additions & 1 deletion zenoh-java/src/commonMain/kotlin/io/zenoh/jni/JNIQuery.kt
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ internal class JNIQuery(private val ptr: Long) {
sample.kind.ordinal,
timestampEnabled,
if (timestampEnabled) sample.timestamp!!.ntpValue() else 0,
sample.attachment?.let { encodeAttachment(it) },
)
}

Expand All @@ -58,7 +59,8 @@ internal class JNIQuery(private val ptr: Long) {
valueEncoding: Int,
sampleKind: Int,
timestampEnabled: Boolean,
timestampNtp64: Long
timestampNtp64: Long,
attachment: ByteArray?,
)

@Throws(ZenohException::class)
Expand Down
29 changes: 21 additions & 8 deletions zenoh-java/src/commonMain/kotlin/io/zenoh/jni/JNISession.kt
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import io.zenoh.publication.Put
import io.zenoh.query.*
import io.zenoh.queryable.Query
import io.zenoh.queryable.Queryable
import io.zenoh.sample.Attachment
import io.zenoh.sample.Sample
import io.zenoh.selector.Selector
import io.zenoh.subscriber.Reliability
Expand Down Expand Up @@ -80,13 +81,15 @@ internal class JNISession {
keyExpr: KeyExpr, callback: Callback<Sample>, onClose: () -> Unit, receiver: R?, reliability: Reliability
): Subscriber<R> {
val subCallback =
JNISubscriberCallback { keyExprPtr, payload, encoding, kind, timestampNTP64, timestampIsValid ->
JNISubscriberCallback { keyExprPtr, payload, encoding, kind, timestampNTP64, timestampIsValid, attachmentBytes ->
val timestamp = if (timestampIsValid) TimeStamp(timestampNTP64) else null
val attachment = attachmentBytes.takeIf { it.isNotEmpty() }?.let { decodeAttachment(it) }
val sample = Sample(
KeyExpr(JNIKeyExpr(keyExprPtr)),
Value(payload, Encoding(KnownEncoding.fromInt(encoding))),
SampleKind.fromInt(kind),
timestamp
timestamp,
attachment
)
callback.run(sample)
}
Expand All @@ -101,12 +104,13 @@ internal class JNISession {
keyExpr: KeyExpr, callback: Callback<Query>, onClose: () -> Unit, receiver: R?, complete: Boolean
): Queryable<R> {
val queryCallback =
JNIQueryableCallback { keyExprPtr: Long, selectorParams: String, withValue: Boolean, payload: ByteArray?, encoding: Int, queryPtr: Long ->
JNIQueryableCallback { keyExprPtr: Long, selectorParams: String, withValue: Boolean, payload: ByteArray?, encoding: Int, attachmentBytes: ByteArray, queryPtr: Long ->
val jniQuery = JNIQuery(queryPtr)
val keyExpression = KeyExpr(JNIKeyExpr(keyExprPtr))
val selector = Selector(keyExpression, selectorParams)
val value: Value? = if (withValue) Value(payload!!, Encoding(KnownEncoding.fromInt(encoding))) else null
val query = Query(keyExpression, selector, value, jniQuery)
val decodedAttachment = attachmentBytes.takeIf { it.isNotEmpty() }?.let { decodeAttachment(it) }
val query = Query(keyExpression, selector, value, decodedAttachment, jniQuery)
callback.run(query)
}
val queryableRawPtr =
Expand All @@ -123,17 +127,20 @@ internal class JNISession {
timeout: Duration,
target: QueryTarget,
consolidation: ConsolidationMode,
value: Value?
value: Value?,
attachment: Attachment?
): R? {
val getCallback =
JNIGetCallback { replierId: String, success: Boolean, keyExprPtr: Long, payload: ByteArray, encoding: Int, kind: Int, timestampNTP64: Long, timestampIsValid: Boolean ->
JNIGetCallback { replierId: String, success: Boolean, keyExprPtr: Long, payload: ByteArray, encoding: Int, kind: Int, timestampNTP64: Long, timestampIsValid: Boolean, attachmentBytes: ByteArray ->
if (success) {
val timestamp = if (timestampIsValid) TimeStamp(timestampNTP64) else null
val decodedAttachment = attachmentBytes.takeIf { it.isNotEmpty() }?.let { decodeAttachment(it) }
val sample = Sample(
KeyExpr(JNIKeyExpr(keyExprPtr)),
Value(payload, Encoding(KnownEncoding.fromInt(encoding))),
SampleKind.fromInt(kind),
timestamp
timestamp,
decodedAttachment
)
val reply = Reply.Success(replierId, sample)
callback.run(reply)
Expand All @@ -153,6 +160,7 @@ internal class JNISession {
timeout.toMillis(),
target.ordinal,
consolidation.ordinal,
attachment?.let { encodeAttachment(it) }
)
} else {
getWithValueViaJNI(
Expand All @@ -166,6 +174,7 @@ internal class JNISession {
consolidation.ordinal,
value.payload,
value.encoding.knownEncoding.ordinal,
attachment?.let { encodeAttachment(it) }
)
}
return receiver
Expand Down Expand Up @@ -195,6 +204,7 @@ internal class JNISession {
put.congestionControl.ordinal,
put.priority.value,
put.kind.ordinal,
put.attachment?.let { encodeAttachment(it) }
)
}

Expand Down Expand Up @@ -246,6 +256,7 @@ internal class JNISession {
timeoutMs: Long,
target: Int,
consolidation: Int,
attachmentBytes: ByteArray?,
)

@Throws(ZenohException::class)
Expand All @@ -259,7 +270,8 @@ internal class JNISession {
target: Int,
consolidation: Int,
payload: ByteArray,
encoding: Int
encoding: Int,
attachmentBytes: ByteArray?,
)

@Throws(ZenohException::class)
Expand All @@ -271,5 +283,6 @@ internal class JNISession {
congestionControl: Int,
priority: Int,
kind: Int,
attachmentBytes: ByteArray?,
)
}
74 changes: 74 additions & 0 deletions zenoh-java/src/commonMain/kotlin/io/zenoh/jni/JNIUtils.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
//
// Copyright (c) 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

package io.zenoh.jni

import io.zenoh.sample.Attachment

/**
* Encode attachment as a byte array.
*/
internal fun encodeAttachment(attachment: Attachment): ByteArray {
return attachment.values.map {
val key = it.first
val keyLength = key.size.toByteArray()
val value = it.second
val valueLength = value.size.toByteArray()
keyLength + key + valueLength + value
}.reduce { acc, bytes -> acc + bytes }
}

/**
* Decode an attachment as a byte array, recreating the original [Attachment].
*/
internal fun decodeAttachment(attachmentBytes: ByteArray): Attachment {
var idx = 0
var sliceSize: Int
val pairs: MutableList<Pair<ByteArray, ByteArray>> = mutableListOf()
while (idx < attachmentBytes.size) {
sliceSize = attachmentBytes.sliceArray(IntRange(idx, idx + Int.SIZE_BYTES - 1)).toInt()
idx += Int.SIZE_BYTES

val key = attachmentBytes.sliceArray(IntRange(idx, idx + sliceSize - 1))
idx += sliceSize

sliceSize = attachmentBytes.sliceArray(IntRange(idx, idx + Int.SIZE_BYTES - 1)).toInt()
idx += Int.SIZE_BYTES

val value = attachmentBytes.sliceArray(IntRange(idx, idx + sliceSize - 1))
idx += sliceSize

pairs.add(key to value)
}
return Attachment(pairs)
}

/**
* Converts an integer into a byte array with little endian format.
*/
fun Int.toByteArray(): ByteArray {
val result = ByteArray(UInt.SIZE_BYTES)
(0 until UInt.SIZE_BYTES).forEach {
result[it] = this.shr(Byte.SIZE_BITS * it).toByte()
}
return result
}

/**
* To int. The byte array is expected to be in Little Endian format.
*
* @return The integer value.
*/
fun ByteArray.toInt(): Int =
(((this[3].toUInt() and 0xFFu) shl 24) or ((this[2].toUInt() and 0xFFu) shl 16) or ((this[1].toUInt() and 0xFFu) shl 8) or (this[0].toUInt() and 0xFFu)).toInt()
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ internal fun interface JNIGetCallback {
encoding: Int,
kind: Int,
timestampNTP64: Long,
timestampIsValid: Boolean
timestampIsValid: Boolean,
attachment: ByteArray,
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,6 @@ internal fun interface JNIQueryableCallback {
withValue: Boolean,
payload: ByteArray?,
encoding: Int,
attachmentBytes: ByteArray,
queryPtr: Long)
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ internal fun interface JNISubscriberCallback {
encoding: Int,
kind: Int,
timestampNTP64: Long,
timestampIsValid: Boolean
timestampIsValid: Boolean,
attachment: ByteArray,
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class Delete private constructor(
congestionControl: CongestionControl,
priority: Priority,
kind: SampleKind
) : Put(keyExpr, value, congestionControl, priority, kind) {
) : Put(keyExpr, value, congestionControl, priority, kind, null) {

companion object {
/**
Expand Down
Loading

0 comments on commit 97b2a5c

Please sign in to comment.