Skip to content

Commit

Permalink
支持反向WebSocket多实例 (#96)
Browse files Browse the repository at this point in the history
* feat: ✨ 为ConnectFactory 添加统一的Producer API

* feat: ✨ 替换OverFlowImpl.start0里的过期api

* feat: ✨ 1

* do not log anything if printInfo is false

* Update Overflow.kt

---------

Co-authored-by: 人間工作 <[email protected]>
  • Loading branch information
kagg886 and MrXiaoM authored Sep 13, 2024
1 parent 2a696a8 commit 7eee8b5
Show file tree
Hide file tree
Showing 11 changed files with 473 additions and 50 deletions.
7 changes: 6 additions & 1 deletion onebot/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ setupMavenCentralPublication {
artifact(tasks.getByName("dokkaJavadocJar"))
}

tasks.test {
useJUnitPlatform()
}

dependencies {
implementation("org.projectlombok:lombok:1.18.26")
implementation("com.google.code.gson:gson:2.10.1")
Expand All @@ -27,6 +31,7 @@ dependencies {

annotationProcessor("org.java-websocket:Java-WebSocket:1.5.7")
annotationProcessor("org.projectlombok:lombok:1.18.26")

testCompileOnly("org.jetbrains.kotlinx:kotlinx-coroutines-core-jvm:1.6.4")
testImplementation("org.junit.jupiter:junit-jupiter:5.8.1")
testImplementation(kotlin("test"))
}
4 changes: 3 additions & 1 deletion onebot/src/main/kotlin/client/config/BotConfig.kt
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,6 @@ class BotConfig(
*/
val retryRestMills: Long = 60000L,
val parentJob: Job? = null
)
) {
val isInReverseMode get() = reversedPort in 1..65535
}
36 changes: 36 additions & 0 deletions onebot/src/main/kotlin/client/connection/ConnectFactory.kt
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class ConnectFactory private constructor(
* @return 连接实例
*/
@JvmOverloads
@Deprecated("please use build()")
suspend fun createWebsocketClient(scope: CoroutineScope = CoroutineScope(CoroutineName("WSClient"))): WSClient? {
val builder = StringBuilder()
val header = mutableMapOf<String, String>()
Expand Down Expand Up @@ -74,6 +75,7 @@ class ConnectFactory private constructor(
* @return 连接实例
*/
@JvmOverloads
@Deprecated("please use build()")
suspend fun createWebsocketServerAndWaitConnect(
scope: CoroutineScope = CoroutineScope(CoroutineName("WSServer"))
): Pair<WSServer, Bot>? {
Expand All @@ -92,6 +94,40 @@ class ConnectFactory private constructor(
return pair
}

@JvmOverloads
fun createProducer(
scope: CoroutineScope = CoroutineScope(CoroutineName("ConnectFactory"))
): OneBotProducer {
if (config.isInReverseMode) {
val address = InetSocketAddress(config.reversedPort)
return ReversedOneBotProducer(WSServer.create(scope, config, address, logger, actionHandler, config.token))
}
//使用正向包装
val builder = StringBuilder()
val header = mutableMapOf<String, String>()
builder.append(config.url)
if (config.isAccessToken) {
builder.append("?access_token=")
builder.append(config.token)
header["Authorization"] = "Bearer ${config.token}"
}

val url = builder.toString()

return PositiveOneBotProducer(WSClient.create(
scope,
config,
URI.create(url),
logger,
actionHandler,
config.retryTimes,
config.retryWaitMills,
config.retryRestMills,
header
))
}


companion object {
@JvmStatic
fun create(config: BotConfig, parent: Job? = null, logger: Logger = LoggerFactory.getLogger("Onebot")): ConnectFactory {
Expand Down
50 changes: 47 additions & 3 deletions onebot/src/main/kotlin/client/connection/WSClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,15 @@ class WSClient(
) : WebSocketClient(uri, header), IAdapter {
private var retryCount = 0
private var scheduleClose = false

@OptIn(InternalCoroutinesApi::class)
private val connectDef = CompletableDeferred<Boolean>(config.parentJob).apply {
invokeOnCompletion(
onCancelling = true,
invokeImmediately = true
) { close() }
}

fun createBot(): Bot {
return Bot(this, config, actionHandler)
}
Expand Down Expand Up @@ -82,7 +84,14 @@ class WSClient(
scope.launch {
if (retryCount < retryTimes) {
retryCount++
logger.warn("等待 ${String.format("%.1f", retryWaitMills / 1000.0F)} 秒后重连 (第 $retryCount/$retryTimes 次)")
logger.warn(
"等待 ${
String.format(
"%.1f",
retryWaitMills / 1000.0F
)
} 秒后重连 (第 $retryCount/$retryTimes 次)"
)
delay(retryWaitMills)
} else {
retryCount = 0
Expand All @@ -106,9 +115,44 @@ class WSClient(
}

companion object {
suspend fun createAndConnect(scope: CoroutineScope, config: BotConfig, uri: URI, logger: Logger, actionHandler: ActionHandler, retryTimes: Int, retryWaitMills: Long, retryRestMills: Long, header: Map<String, String> = mapOf()): WSClient? {
val ws = WSClient(scope, config, uri, logger, actionHandler, retryTimes, retryWaitMills, retryRestMills, header)
suspend fun createAndConnect(
scope: CoroutineScope,
config: BotConfig,
uri: URI,
logger: Logger,
actionHandler: ActionHandler,
retryTimes: Int,
retryWaitMills: Long,
retryRestMills: Long,
header: Map<String, String> = mapOf()
): WSClient? {
val ws =
WSClient(scope, config, uri, logger, actionHandler, retryTimes, retryWaitMills, retryRestMills, header)
return ws.takeIf { ws.connectSuspend() }
}

fun create(
scope: CoroutineScope,
config: BotConfig,
uri: URI,
logger: Logger,
actionHandler: ActionHandler,
retryTimes: Int,
retryWaitMills: Long,
retryRestMills: Long,
header: Map<String, String> = mapOf()
): WSClient {
return WSClient(
scope,
config,
uri,
logger,
actionHandler,
retryTimes,
retryWaitMills,
retryRestMills,
header
)
}
}
}
65 changes: 57 additions & 8 deletions onebot/src/main/kotlin/client/connection/WSServer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,20 @@ import cn.evolvefield.onebot.client.config.BotConfig
import cn.evolvefield.onebot.client.core.Bot
import cn.evolvefield.onebot.client.handler.ActionHandler
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
import kotlinx.coroutines.flow.receiveAsFlow
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import org.java_websocket.WebSocket
import org.java_websocket.WebSocketServerFactory
import org.java_websocket.framing.CloseFrame
import org.java_websocket.handshake.ClientHandshake
import org.java_websocket.server.DefaultWebSocketServerFactory
import org.java_websocket.server.WebSocketServer
import org.slf4j.Logger
import java.net.InetSocketAddress
import kotlin.time.Duration

/**
* Project: onebot-client
Expand All @@ -25,20 +33,38 @@ class WSServer(
override val actionHandler: ActionHandler,
private val token: String
) : WebSocketServer(address), IAdapter {
private var bot: Bot? = null
// private var bot: Bot? = null
private val bots: MutableList<Bot> = mutableListOf()
private val botChannel = Channel<Bot>()
private val muteX = Mutex()
val connectDef = CompletableDeferred<Bot>(config.parentJob)

val closeHandler = mutableListOf<() -> Unit>()

init {
//等待测试
setWebSocketFactory(object : WebSocketServerFactory by DefaultWebSocketServerFactory() {
override fun close() {
for (i in closeHandler) {
i()
}
}
})
}

override fun onStart() {
logger.info("▌ 反向 WebSocket 服务端已在 $address 启动")
logger.info("▌ 正在等待客户端连接...")
}

suspend fun awaitNewBot(timeout: Duration = Duration.INFINITE): Bot {
return withTimeout(timeout) {
botChannel.receive()
}
}

override fun onOpen(conn: WebSocket, handshake: ClientHandshake) {
if (handshake.resourceDescriptor != "/") return
if (bot?.channel?.isOpen == true) {
conn.close(CloseFrame.NORMAL, "Overflow 的反向 WS 适配器暂不支持多客户端连接")
return
}
if (token.isNotBlank()) {
if (handshake.hasFieldValue("Authorization")) {
val param = handshake.getFieldValue("Authorization").run {
Expand All @@ -60,8 +86,15 @@ class WSServer(
}
}
logger.info("▌ 反向 WebSocket 客户端 ${conn.remoteSocketAddress} 已连接 ┈━═☆")
bot = Bot(conn, config, actionHandler).also { it.conn = conn }
connectDef.complete(bot!!)
runBlocking {
val bot = muteX.withLock {
Bot(conn, config, actionHandler).also { it.conn = conn }.apply {
connectDef.complete(this)
}
}
bots.add(bot)
botChannel.send(bot)
}
}

override fun onMessage(conn: WebSocket, message: String) = onReceiveMessage(message)
Expand All @@ -73,13 +106,20 @@ class WSServer(
CloseCode.valueOf(code) ?: code
)
unlockMutex()
runBlocking {
muteX.withLock {
bots.removeIf { it.conn == conn }
}
}
}

override fun onError(conn: WebSocket, ex: Exception) {
//ws连接阶段时conn为null
override fun onError(conn: WebSocket?, ex: Exception) {
logger.error("▌ 反向 WebSocket 客户端连接出现错误 {} 或未连接 ┈━═☆", ex.localizedMessage)
}

companion object {
@Deprecated("please use create() and awaitNewBot()")
suspend fun createAndWaitConnect(
scope: CoroutineScope, config: BotConfig,
address: InetSocketAddress, logger: Logger,
Expand All @@ -89,5 +129,14 @@ class WSServer(
val bot = ws.connectDef.await()
return ws to bot
}

fun create(
scope: CoroutineScope, config: BotConfig,
address: InetSocketAddress, logger: Logger,
actionHandler: ActionHandler, token: String
): WSServer {
val ws = WSServer(scope, config, address, logger, actionHandler, token).also { it.start() }
return ws
}
}
}
41 changes: 41 additions & 0 deletions onebot/src/main/kotlin/client/connection/platform-connection.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package cn.evolvefield.onebot.client.connection

import cn.evolvefield.onebot.client.core.Bot
import kotlinx.coroutines.withTimeout
import kotlin.time.Duration

interface OneBotProducer {
fun invokeOnClose(block: () -> Unit)
fun close()
suspend fun awaitNewBotConnection(duration: Duration = Duration.INFINITE): Bot?
}

class PositiveOneBotProducer(private val client: WSClient) : OneBotProducer {
override fun invokeOnClose(block: () -> Unit) = TODO("客户端暂不支持断线重连")

override fun close() = client.close()

override suspend fun awaitNewBotConnection(duration: Duration): Bot? {
return kotlin.runCatching {
withTimeout(duration) {
if (client.connectSuspend()) client.createBot() else null
}
}.getOrNull()
}
}

class ReversedOneBotProducer(private val server: WSServer) : OneBotProducer {
override fun invokeOnClose(block: () -> Unit) {
server.closeHandler.add(block)
}

override fun close() = server.stop()

override suspend fun awaitNewBotConnection(duration: Duration): Bot? {
return kotlin.runCatching {
withTimeout(duration) {
server.awaitNewBot()
}
}.getOrNull()
}
}
Loading

0 comments on commit 7eee8b5

Please sign in to comment.