diff --git a/WuKongIMSDK/Classes/WKOptions.h b/WuKongIMSDK/Classes/WKOptions.h index e72e32a..ab04a6f 100644 --- a/WuKongIMSDK/Classes/WKOptions.h +++ b/WuKongIMSDK/Classes/WKOptions.h @@ -138,6 +138,9 @@ typedef WKConnectInfo*_Nonnull(^WKConnectInfoCallback)(void); @property(nonatomic,assign) NSInteger expireMsgCheckInterval; // 过期消息检查间隔 单位秒 @property(nonatomic,assign) NSInteger expireMsgLimit; // 过期消息每次查询数量 +@property(nonatomic,assign) NSInteger sendFrequency; // 消息发送延迟时间 单位毫秒 +@property(nonatomic,assign) NSInteger sendMaxCountOfEach; // 消息每次发送最大数量 + @end NS_ASSUME_NONNULL_END diff --git a/WuKongIMSDK/Classes/WKOptions.m b/WuKongIMSDK/Classes/WKOptions.m index 1b2e7d8..78ea778 100644 --- a/WuKongIMSDK/Classes/WKOptions.m +++ b/WuKongIMSDK/Classes/WKOptions.m @@ -44,6 +44,9 @@ -(id) init { self.receiptFlushInterval = 2; self.channelRequestMaxLimit = 10; + + self.sendFrequency = 100; + self.sendMaxCountOfEach = 5; } return self; } diff --git a/WuKongIMSDK/Classes/db/WKMessageDB.h b/WuKongIMSDK/Classes/db/WKMessageDB.h index f313616..0a58aa9 100644 --- a/WuKongIMSDK/Classes/db/WKMessageDB.h +++ b/WuKongIMSDK/Classes/db/WKMessageDB.h @@ -220,6 +220,9 @@ NS_ASSUME_NONNULL_BEGIN -(void) deleteMessagesWithClientSeqs:(NSArray*)ids; +-(void) deleteMessagesWithMessageIDs:(NSArray*)messageIDs; + +-(void) deleteMessagesWithMessageIDs:(NSArray*)messageIDs db:(FMDatabase*)db; /** 彻底将消息从数据库删除 (deleteMessage只是标记为删除) diff --git a/WuKongIMSDK/Classes/db/WKMessageDB.m b/WuKongIMSDK/Classes/db/WKMessageDB.m index 5b5aae6..7d264d6 100644 --- a/WuKongIMSDK/Classes/db/WKMessageDB.m +++ b/WuKongIMSDK/Classes/db/WKMessageDB.m @@ -81,6 +81,9 @@ #define SQL_MESSAGE_UPDATE_EXTRA [NSString stringWithFormat:@"update %@ set extra=? where id=?",TB_MESSAGE] // 删除指定id的消息 #define SQL_MESSAGE_DELETE_MESSAGE_ID [NSString stringWithFormat:@"update %@ set is_deleted=1 where message_id=?",TB_MESSAGE] + +#define SQL_MESSAGE_DELETE_MESSAGE_IDS [NSString stringWithFormat:@"update %@ set is_deleted=1 where message_id in ",TB_MESSAGE] + // 删除指定id的消息 #define SQL_MESSAGE_DELETE_CLIENT_SEQ [NSString stringWithFormat:@"update %@ set is_deleted=1 where id=?",TB_MESSAGE] @@ -770,6 +773,27 @@ -(void) deleteMessagesWithClientSeqs:(NSArray*)ids { }]; } +-(void) deleteMessagesWithMessageIDs:(NSArray*)messageIDs { + __weak typeof(self) weakSelf = self; + [WKDB.sharedDB.dbQueue inDatabase:^(FMDatabase * _Nonnull db) { + [weakSelf deleteMessagesWithMessageIDs:messageIDs db:db]; + }]; +} + +-(void) deleteMessagesWithMessageIDs:(NSArray*)messageIDs db:(FMDatabase*)db { + if(messageIDs && messageIDs.count==0) { + return; + } + if(messageIDs.count == 1) { + NSNumber *messageID = messageIDs[0]; + [db executeUpdate:SQL_MESSAGE_DELETE_MESSAGE_ID,messageID]; + }else { + NSString *idStrs = [messageIDs componentsJoinedByString:@","]; + [db executeUpdate:[NSString stringWithFormat:@"%@ (%@)",SQL_MESSAGE_DELETE_MESSAGE_IDS,idStrs]]; + } + +} + - (void)destoryMessage:(WKMessage *)message { [[WKDB sharedDB].dbQueue inDatabase:^(FMDatabase * _Nonnull db) { diff --git a/WuKongIMSDK/Classes/db/WKMessageExtraDB.m b/WuKongIMSDK/Classes/db/WKMessageExtraDB.m index 6207379..d785e5f 100644 --- a/WuKongIMSDK/Classes/db/WKMessageExtraDB.m +++ b/WuKongIMSDK/Classes/db/WKMessageExtraDB.m @@ -55,6 +55,7 @@ -(void) addOrUpdateMessageExtras:(NSArray*)messageExtras { return; } [[WKDB sharedDB].dbQueue inTransaction:^(FMDatabase * _Nonnull db, BOOL * _Nonnull rollback) { + NSMutableArray *needDeleteMessageIDs = [NSMutableArray array]; for (WKMessageExtra *messageExtra in messageExtras) { NSString *extraStr = @""; if(messageExtra.extra) { @@ -65,6 +66,7 @@ -(void) addOrUpdateMessageExtras:(NSArray*)messageExtras { readedAt = [messageExtra.readedAt timeIntervalSince1970]; } [db executeUpdate:SQL_MESSAGE_EXTRA_INSERT_OR_UPDATE,@(messageExtra.messageID),@(messageExtra.messageSeq),messageExtra.channelID?:@"",@(messageExtra.channelType),@(messageExtra.readed),@(readedAt),@(messageExtra.readedCount),@(messageExtra.unreadCount),@(messageExtra.revoke),messageExtra.revoker?:@"",messageExtra.contentEditData?:@"",@(messageExtra.editedAt),extraStr,@(messageExtra.extraVersion)]; + } }]; } diff --git a/WuKongIMSDK/Classes/manager/WKChatManager.m b/WuKongIMSDK/Classes/manager/WKChatManager.m index ac8e347..80608e2 100644 --- a/WuKongIMSDK/Classes/manager/WKChatManager.m +++ b/WuKongIMSDK/Classes/manager/WKChatManager.m @@ -29,6 +29,7 @@ #import "WKMessageExtraDB.h" #import "WKConversationManagerInner.h" #import "WKConversationLastMessageAndUnreadCount.h" +#import "WKMessageQueueManager.h" @interface WKChatManager () @@ -253,32 +254,37 @@ -(WKMessage*) sendMessage:(WKMessage*)message { -(WKMessage*) sendMessage:(WKMessage*)message addRetryQueue:(BOOL)addRetryQueue{ - - dispatch_async(self.sendMessageQueue,^{ - // 发送消息 - WKSendPacket *sendPacket = [WKSendPacket new]; - sendPacket.header.showUnread = message.header?message.header.showUnread:0; - sendPacket.header.noPersist = message.header?message.header.noPersist:0; - WKSetting *setting = message.setting; - if(message.topic && ![message.topic isEqualToString:@""]) { - setting.topic = true; - } - sendPacket.setting = setting; - sendPacket.clientSeq = message.clientSeq; - sendPacket.clientMsgNo = message.clientMsgNo; - sendPacket.channelId = message.channel.channelId; - sendPacket.channelType = message.channel.channelType; - sendPacket.expire = message.expire; - sendPacket.topic = message.topic; - sendPacket.payload = message.content.encode; - if(addRetryQueue) { // 添加到重试队列 [[WKRetryManager shared] add:message]; } - [[[WKSDK shared] connectionManager] sendPacket:sendPacket]; - - }); + [WKMessageQueueManager.shared sendMessage:message]; + +// dispatch_async(self.sendMessageQueue,^{ +// // 发送消息 +// WKSendPacket *sendPacket = [WKSendPacket new]; +// sendPacket.header.showUnread = message.header?message.header.showUnread:0; +// sendPacket.header.noPersist = message.header?message.header.noPersist:0; +// WKSetting *setting = message.setting; +// if(message.topic && ![message.topic isEqualToString:@""]) { +// setting.topic = true; +// } +// sendPacket.setting = setting; +// sendPacket.clientSeq = message.clientSeq; +// sendPacket.clientMsgNo = message.clientMsgNo; +// sendPacket.channelId = message.channel.channelId; +// sendPacket.channelType = message.channel.channelType; +// sendPacket.expire = message.expire; +// sendPacket.topic = message.topic; +// sendPacket.payload = message.content.encode; +// +// if(addRetryQueue) { +// // 添加到重试队列 +// [[WKRetryManager shared] add:message]; +// } +// [[[WKSDK shared] connectionManager] sendPacket:sendPacket]; +// +// }); @@ -452,13 +458,16 @@ -(void) handleSendack:(NSArray *)sendackArray { } // 调用委托 - NSArray *messages = [[WKMessageDB shared] getMessagesWithClientSeqs:clientIDs]; - if(messages && messages.count>0) { - for (NSInteger i=0; i0) { + NSArray *messages = [[WKMessageDB shared] getMessagesWithClientSeqs:clientIDs]; + if(messages && messages.count>0) { + NSLog(@"messages--aaaa------->%lu",messages.count); + for (NSInteger i=0; i%@",error); return; } + [[WKMessageExtraDB shared] addOrUpdateMessageExtras:@[message.remoteExtra]]; [weakSelf callMessageUpdateDelegate:message]; + + if(message.remoteExtra.isMutualDeleted) { // 如果是双向删除 则删除此消息 + [weakSelf deleteMessage:message]; + } + }); } @@ -1701,11 +1716,31 @@ -(void) syncMessageExtra:(WKChannel*)channel complete:(void(^)(NSError *error))c if(messages && messages.count>0) { NSDictionary *reactionDict= [[WKReactionDB shared] getReactionDictionary:messageIDs]; NSInteger i = messages.count - 1; + + // 消息更新通知 for (WKMessage *message in messages) { message.reactions = reactionDict[[NSString stringWithFormat:@"%llu", message.messageId]]; [weakSelf callMessageUpdateDelegate:message left:i total:messages.count]; i--; } + + // 消息删除通知 + NSMutableArray *deletedMessages = [NSMutableArray array]; + for (WKMessage *message in messages) { + for (WKMessageExtra *messageExtra in results) { + if(messageExtra.isMutualDeleted) { + message.isDeleted = true; + [deletedMessages addObject:message]; + break; + } + } + } + if(deletedMessages.count>0) { + for (WKMessage *message in deletedMessages) { + [weakSelf deleteMessage:message]; + } + } + } // [weakSelf updateMessageExtraFromRemote:results]; } diff --git a/WuKongIMSDK/Classes/manager/WKConnectionManager.h b/WuKongIMSDK/Classes/manager/WKConnectionManager.h index 47c6cce..2e70eb7 100644 --- a/WuKongIMSDK/Classes/manager/WKConnectionManager.h +++ b/WuKongIMSDK/Classes/manager/WKConnectionManager.h @@ -91,6 +91,7 @@ typedef enum : NSUInteger { */ -(void) sendPacket:(WKPacket*)packet; +-(void) writeData:(NSData*) data; /** 发送ping包 diff --git a/WuKongIMSDK/Classes/manager/WKConnectionManager.m b/WuKongIMSDK/Classes/manager/WKConnectionManager.m index d7a1e26..7ef4770 100644 --- a/WuKongIMSDK/Classes/manager/WKConnectionManager.m +++ b/WuKongIMSDK/Classes/manager/WKConnectionManager.m @@ -26,6 +26,7 @@ #import "WKReminderManager.h" #import "WKChatManagerInner.h" #import "WKConversationManagerInner.h" +#import "WKMessageQueueManager.h" @interface WKConnectionManager () @@ -428,7 +429,11 @@ -(void) sendPing{ // 发送包 -(void) sendPacket:(WKPacket*)packet{ NSData *data = [[WKSDK shared].coder encode:packet]; - [self.ssocket writeData:data withTimeout:-1 tag:0]; + [self writeData:data]; +} + +-(void) writeData:(NSData*) data { + [self.ssocket writeData:data withTimeout:1 tag:0]; } - (NSLock *)delegateLock { @@ -453,8 +458,10 @@ - (void)removeDelegate:(id) delegate { -(void) connectStatusChange { if(self.connectStatusInner == WKConnected) { [[WKRetryManager shared] start]; + [WKMessageQueueManager.shared start]; }else { [[WKRetryManager shared] stop]; + [[WKMessageQueueManager shared] stop]; } if(self.onConnectStatusChange) { self.onConnectStatusChange(self.connectStatusInner); @@ -519,6 +526,9 @@ -(void) unpacket:(NSData*)packetData callback:(void(^) (NSArray *data) [dataList addObject:data]; }]; lenAfter = self.tempBufferData.length; + if(lenAfter>0) { + NSLog(@"有剩余未被解析的包->%lu",lenAfter); + } } while (lenBefore != lenAfter && lenAfter >= 1); if (dataList.count > 0) { callback(dataList); @@ -568,19 +578,23 @@ -(NSMutableData*) unpackOneLM:(NSMutableData*)packData callback:(void(^) (NSData } while (hasLength); if (!remLengthFull) { + NSLog(@"包长度没有读出来"); return packData; } int remLengthLength = pos - fixedHeaderLength; // 剩余长度的长度 if (fixedHeaderLength + remLengthLength + remLength > length) { // 固定头的长度 + 剩余长度的长度 + 剩余长度 如果大于 总长度说明分包了 + NSLog(@"分包了..."); return packData; }else { if (fixedHeaderLength + remLengthLength + remLength == length) { // 刚好一个包 + NSLog(@"刚好一个包"); callback(packData); return [[NSMutableData alloc] init]; } else { // 粘包 大于1个包 + NSLog(@"粘包 大于1个包"); int packetLength = fixedHeaderLength + remLengthLength + remLength;; callback([packData subdataWithRange:NSMakeRange(0, packetLength)]); return [[NSMutableData alloc] initWithData:[packData subdataWithRange:NSMakeRange(packetLength, length-packetLength)]]; @@ -653,6 +667,7 @@ -(void) handlePacketData:(NSArray*)dataList { } -(void) handlePackets:(NSArray*)packets { + NSLog(@"handlePackets----------------start---->%lu",packets.count); NSDictionary*>* packetDict = [self packetGroup:packets]; for (NSNumber *packetTypeNum in packetDict.allKeys) { NSArray *packetList = [packetDict objectForKey:packetTypeNum]; @@ -661,11 +676,16 @@ -(void) handlePackets:(NSArray*)packets { [[WKSDK shared].chatManager handleSendack:(NSArray *)packetList]; break; case WK_RECV: - [[WKSDK shared].chatManager handleRecv:(NSArray *)packetList]; + [[WKSDK shared].chatManager handleRecv:(NSArray *)packetList]; + break; + case WK_PONG: + break; default: + NSLog(@"未知的数据包-->[%d]",packetTypeNum.unsignedIntValue); break; } } + NSLog(@"handlePackets----------------end---->%lu",packets.count); } -( WKConnectStatus) connectStatus { diff --git a/WuKongIMSDK/Classes/manager/WKMessageQueueManager.h b/WuKongIMSDK/Classes/manager/WKMessageQueueManager.h new file mode 100644 index 0000000..5217b9f --- /dev/null +++ b/WuKongIMSDK/Classes/manager/WKMessageQueueManager.h @@ -0,0 +1,25 @@ +// +// WKMessageQueueManager.h +// WuKongIMSDK +// +// Created by tt on 2023/11/15. +// + +#import +#import "WKMessage.h" + +NS_ASSUME_NONNULL_BEGIN + +@interface WKMessageQueueManager : NSObject + ++ (WKMessageQueueManager *)shared; + +-(void) start; + +-(void) stop; + +- (void)sendMessage:(WKMessage *)message; + +@end + +NS_ASSUME_NONNULL_END diff --git a/WuKongIMSDK/Classes/manager/WKMessageQueueManager.m b/WuKongIMSDK/Classes/manager/WKMessageQueueManager.m new file mode 100644 index 0000000..a7bbc3f --- /dev/null +++ b/WuKongIMSDK/Classes/manager/WKMessageQueueManager.m @@ -0,0 +1,115 @@ +// +// WKMessageQueueManager.m +// WuKongIMSDK +// +// Created by tt on 2023/11/15. +// + +#import "WKMessageQueueManager.h" +#import "WKMessage.h" +#import "WKSendPacket.h" +#import "WKSDK.h" +#import "WKConnectionManager.h" + +@interface WKMessageQueueManager () + +@property (nonatomic, strong) NSMutableArray *sendPackets; + +@property(nonatomic,strong) NSTimer *timer; + +@end + +@implementation WKMessageQueueManager + + +static WKMessageQueueManager *_instance; ++ (id)allocWithZone:(NSZone *)zone +{ + static dispatch_once_t onceToken; + dispatch_once(&onceToken, ^{ + _instance = [super allocWithZone:zone]; + }); + return _instance; +} ++ (WKMessageQueueManager *)shared +{ + static dispatch_once_t onceToken; + dispatch_once(&onceToken, ^{ + _instance = [[self alloc] init]; + }); + return _instance; +} +- (NSMutableArray *)sendPackets { + if(!_sendPackets) { + _sendPackets = [NSMutableArray array]; + } + return _sendPackets; +} + +- (void)sendMessage:(WKMessage *)message { + // 发送消息 + WKSendPacket *sendPacket = [WKSendPacket new]; + sendPacket.header.showUnread = message.header?message.header.showUnread:0; + sendPacket.header.noPersist = message.header?message.header.noPersist:0; + WKSetting *setting = message.setting; + if(message.topic && ![message.topic isEqualToString:@""]) { + setting.topic = true; + } + sendPacket.setting = setting; + sendPacket.clientSeq = message.clientSeq; + sendPacket.clientMsgNo = message.clientMsgNo; + sendPacket.channelId = message.channel.channelId; + sendPacket.channelType = message.channel.channelType; + sendPacket.expire = message.expire; + sendPacket.topic = message.topic; + sendPacket.payload = message.content.encode; + [self.sendPackets addObject:sendPacket]; + + +} + +-(void) start { + if(self.timer) { + [self.timer invalidate]; + self.timer = nil; + } + CGFloat delay = (double)WKSDK.shared.options.sendFrequency/1000.0f; + NSLog(@"delay--->%0.2f",delay); + self.timer = [NSTimer scheduledTimerWithTimeInterval:delay target:self selector:@selector(flushQueue) userInfo:nil repeats:YES]; + +} + +-(void) stop { + [self.sendPackets removeAllObjects]; + if(self.timer) { + [self.timer invalidate]; + self.timer = nil; + } +} + +-(void) flushQueue { + NSMutableData *sendPacketDatas; + NSInteger sendPacketCount = 0; + while (self.sendPackets.count>0) { + if(!sendPacketDatas) { + sendPacketDatas = [[NSMutableData alloc] init]; + } + + WKSendPacket *sendPacket = self.sendPackets.firstObject; + [self.sendPackets removeObjectAtIndex:0]; + + NSData *data = [[WKSDK shared].coder encode:sendPacket]; + [sendPacketDatas appendData:data]; + + sendPacketCount++; + if(sendPacketCount>=[WKSDK shared].options.sendMaxCountOfEach) { + break; + } + } + if(sendPacketDatas && sendPacketDatas.length>0) { + [WKConnectionManager.sharedManager writeData:sendPacketDatas]; + } +} + + +@end diff --git a/WuKongIMSDK/Classes/model/WKMessageExtra.h b/WuKongIMSDK/Classes/model/WKMessageExtra.h index 606779e..f824325 100644 --- a/WuKongIMSDK/Classes/model/WKMessageExtra.h +++ b/WuKongIMSDK/Classes/model/WKMessageExtra.h @@ -33,6 +33,7 @@ typedef enum : NSUInteger { @property(nonatomic,assign) NSInteger editedAt; // 消息编辑时间 (0表示消息未被编辑) @property(nonatomic,assign) BOOL isEdit; // 是否编辑 @property(nonatomic,assign) WKContentEditUploadStatus uploadStatus; // 上传状态 +@property(nonatomic,assign) BOOL isMutualDeleted; // 是否双向删除 @property(nonatomic,copy) NSDictionary *extra; // 扩展数据