Skip to content

Commit

Permalink
添加获取订阅频道的功能
Browse files Browse the repository at this point in the history
  • Loading branch information
heavyrian2012 committed Jul 20, 2023
1 parent 45b1b9c commit 15f7777
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 14 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.moquette.imhandler;

import cn.wildfirechat.common.ErrorCode;
import cn.wildfirechat.proto.ProtoConstants;
import cn.wildfirechat.proto.WFCMessage;
import io.moquette.spi.impl.Qos1PublishHandler;
import io.netty.buffer.ByteBuf;
import win.liyufan.im.IMTopic;

import java.util.List;

@Handler(IMTopic.ListenedChannelListTopic)
public class ChannelListenedListHandler extends IMHandler<Void> {
@Override
public ErrorCode action(ByteBuf ackPayload, String clientID, String fromUser, ProtoConstants.RequestSourceType requestSourceType, Void request, Qos1PublishHandler.IMCallback callback) {
List<String> channelInfoList = m_messagesStore.getListenedChannels(fromUser);
WFCMessage.IDListBuf.Builder builder = WFCMessage.IDListBuf.newBuilder();
builder.addAllId(channelInfoList);
byte[] data = builder.build().toByteArray();
ackPayload.ensureWritable(data.length).writeBytes(data);
return ErrorCode.ERROR_CODE_SUCCESS;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4139,6 +4139,11 @@ public List<WFCMessage.ChannelInfo> searchChannel(String keyword, boolean buzzy,
return databaseStore.searchChannelFromDB(keyword, buzzy, page);
}


@Override
public List<String> getListenedChannels(String userId) {
return databaseStore.getUserChannels(userId);
}
@Override
public ErrorCode listenChannel(String operator, String channelId, boolean listen) {
HazelcastInstance hzInstance = m_Server.getHazelcastInstance();
Expand Down
1 change: 1 addition & 0 deletions broker/src/main/java/io/moquette/spi/IMessagesStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ public String toString() {
ErrorCode transferChannel(String operator, String channelId, String newOwner);
ErrorCode destroyChannel(String operator, String channelId, boolean isAdmin);
List<WFCMessage.ChannelInfo> searchChannel(String keyword, boolean buzzy, int page);
List<String> getListenedChannels(String userId);
ErrorCode listenChannel(String operator, String channelId, boolean listen);
WFCMessage.ChannelInfo getChannelInfo(String channelId);
boolean canSendMessageInChannel(String user, String channelId);
Expand Down
23 changes: 9 additions & 14 deletions broker/src/main/java/io/moquette/spi/impl/Qos1PublishHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -243,20 +243,15 @@ void receivedPublishQos1(Channel channel, MqttPublishMessage msg) {
String imtopic = topic.getTopic();
ByteBuf payload = msg.payload();
byte[] payloadContent = readBytesAndRewind(payload);
if(payloadContent.length == 0) {
ByteBuf ackPayload = Unpooled.buffer();
ackPayload.ensureWritable(1).writeByte(ERROR_CODE_INVALID_DATA.getCode());
sendPubAck(clientID, messageID, ackPayload, ERROR_CODE_INVALID_DATA);
return;
}

MemorySessionStore.Session session = m_sessionStore.getSession(clientID);
payloadContent = AES.AESDecrypt(payloadContent, session.getSecret(), true);
if(payloadContent == null) {
ByteBuf ackPayload = Unpooled.buffer();
ackPayload.ensureWritable(1).writeByte(ERROR_CODE_INVALID_DATA.getCode());
sendPubAck(clientID, messageID, ackPayload, ERROR_CODE_INVALID_DATA);
return;
if(payloadContent.length > 0) {
MemorySessionStore.Session session = m_sessionStore.getSession(clientID);
payloadContent = AES.AESDecrypt(payloadContent, session.getSecret(), true);
if(payloadContent == null) {
ByteBuf ackPayload = Unpooled.buffer();
ackPayload.ensureWritable(1).writeByte(ERROR_CODE_INVALID_DATA.getCode());
sendPubAck(clientID, messageID, ackPayload, ERROR_CODE_INVALID_DATA);
return;
}
}

imHandler(clientID, username, imtopic, payloadContent, (errorCode, ackPayload) -> sendPubAck(clientID, messageID, ackPayload, errorCode), ProtoConstants.RequestSourceType.Request_From_User);
Expand Down
1 change: 1 addition & 0 deletions broker/src/main/java/win/liyufan/im/IMTopic.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public interface IMTopic {
String ChannelSearchTopic = "CHS";
String ChannelListenTopic = "CHL";
String ChannelPullTopic = "CHP";
String ListenedChannelListTopic = "CHLL";

String GetTokenTopic = "GETTOKEN";
String DestroyUserTopic = "DESTROYUSER";
Expand Down

0 comments on commit 15f7777

Please sign in to comment.