webscoket_util.dart 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245
  1. import 'dart:async';
  2. import 'package:get/get.dart';
  3. import 'package:local_notifier/local_notifier.dart';
  4. import 'package:web_socket_channel/io.dart';
  5. import 'package:web_socket_channel/status.dart' as status;
  6. import '../models/im/index.dart';
  7. import '../values/index.dart';
  8. import 'index.dart';
  9. enum O2WebsocketStatus {
  10. SocketStatusConnected, // 已连接
  11. SocketStatusFailed, // 失败
  12. SocketStatusClosed, // 连接关闭
  13. }
  14. class O2WebsocketClient {
  15. static final O2WebsocketClient _instance = O2WebsocketClient._internal();
  16. factory O2WebsocketClient() {
  17. return _instance;
  18. }
  19. O2WebsocketClient._internal() {
  20. //初始化
  21. }
  22. final heartBeatKey = 'heartbeat';
  23. IOWebSocketChannel? _channel;
  24. var _status = O2WebsocketStatus.SocketStatusConnected;
  25. Timer? _heartBeat; // 心跳定时器
  26. final int _heartTimes = 3000; // 心跳间隔(毫秒)
  27. final int _reconnectCount = 5; // 重连次数,默认5次
  28. int _reconnectTimes = 0; // 重连计数器
  29. Timer? _reconnectTimer; // 重连定时器
  30. var _isReconnect = true;
  31. final eventBus = EventBus();
  32. final channelUtil = O2FlutterMethodChannelUtils();
  33. ///
  34. /// 启动websocket
  35. ///
  36. void open({reopen = false}) {
  37. String? url = O2ApiManager.instance.websocketUrl();
  38. OLogger.i("启动websocket , url: $url");
  39. if (url == null || url.isEmpty) {
  40. OLogger.e('没有获取到websocket地址,无法连接!');
  41. return;
  42. }
  43. try {
  44. _closeSocket(false);
  45. _channel = IOWebSocketChannel.connect(Uri.parse(url));
  46. _status = O2WebsocketStatus.SocketStatusConnected;
  47. // 连接成功,重置重连计数器
  48. if (!reopen) {
  49. _reconnectTimes = 0;
  50. }
  51. if (_reconnectTimer != null) {
  52. _reconnectTimer?.cancel();
  53. _reconnectTimer = null;
  54. }
  55. _initHeartBeat();
  56. _channel?.stream.listen(_webSocketOnMessage, onDone: _webSocketOnDone, onError: _webSocketOnError);
  57. }catch(e) {
  58. OLogger.e(e);
  59. }
  60. }
  61. ///
  62. /// 关闭websokect
  63. ///
  64. void stop() {
  65. _closeSocket(false);
  66. }
  67. ///
  68. /// 发送websocket消息
  69. ///
  70. void sendMsg(String msg) {
  71. if (_channel != null) {
  72. switch(_status) {
  73. case O2WebsocketStatus.SocketStatusClosed:
  74. OLogger.i(" websokect 连接已关闭!");
  75. break;
  76. case O2WebsocketStatus.SocketStatusFailed:
  77. OLogger.i(" websokect 连接失败!");
  78. break;
  79. case O2WebsocketStatus.SocketStatusConnected:
  80. _channel?.sink.add(msg);
  81. break;
  82. default:
  83. break;
  84. }
  85. }
  86. }
  87. /// WebSocket接收消息回调
  88. _webSocketOnMessage(event) {
  89. if (!(event is String && event == heartBeatKey)) {
  90. OLogger.i("接收到 websokect 消息,$event");
  91. Map<String, dynamic> msg = O2Utils.parseStringToJson(event);
  92. if (msg["type"] != null) {
  93. // 聊天消息
  94. if (O2.o2MessageTypeIMCreate == msg["type"]) {
  95. _imCreateMessageNotify(msg);
  96. } else if (O2.o2MessageTypeIMRevoke == msg["type"]) {
  97. _imRevokeMessageNotify(msg);
  98. } else if (O2.o2MeesageTypeIMConversationUpdate == msg['type']) {
  99. _imConversationUpdateNotify(msg);
  100. } else if (O2.o2MeesageTypeIMConversationUpdate == msg['type']) {
  101. _imConversationDeleteNotify(msg);
  102. }
  103. }
  104. }
  105. }
  106. _imConversationDeleteNotify(Map<String, dynamic> msg) {
  107. final body = msg["body"];
  108. if (body != null) {
  109. IMConversationInfo conv = IMConversationInfo.fromJson(body);
  110. eventBus.emit(EventBus.websocketImConversationDeleteMsg, conv);
  111. }
  112. }
  113. _imConversationUpdateNotify(Map<String, dynamic> msg) {
  114. final body = msg["body"];
  115. if (body != null) {
  116. IMConversationInfo conv = IMConversationInfo.fromJson(body);
  117. eventBus.emit(EventBus.websocketImConversationUpdateMsg, conv);
  118. }
  119. }
  120. /// 刷新界面
  121. _imRevokeMessageNotify(Map<String, dynamic> msg) {
  122. var body = msg["body"];
  123. if (body == null) {
  124. return;
  125. }
  126. IMMessage message = IMMessage.fromJson(body);
  127. eventBus.emit(EventBus.websocketRevokeImMsg, message);
  128. }
  129. ///
  130. /// 聊天消息
  131. ///
  132. /// 通知界面刷新消息
  133. /// pc端发送消息通知
  134. _imCreateMessageNotify(Map<String, dynamic> msg) {
  135. var body = msg["body"];
  136. if (body == null) {
  137. return;
  138. }
  139. IMMessage message = IMMessage.fromJson(body);
  140. eventBus.emit(EventBus.websocketCreateImMsg, message);
  141. // macos 上发送消息通知
  142. if (GetPlatform.isMacOS ) {
  143. channelUtil.sendIMMsgToPCNotify(message.id ,msg["title"] ?? '', message.toBody()?.conversationBodyString() ?? '');
  144. } else if (GetPlatform.isWindows) {
  145. LocalNotification notification = LocalNotification(
  146. title: 'appName'.tr,
  147. body: '${msg["title"] ?? ''}: ${message.toBody()?.conversationBodyString() ?? ''}',
  148. );
  149. notification.show();
  150. // channelUtil.sendIMMsgToPCNotify(message.id ,msg["title"] ?? '', message.toBody()?.conversationBodyString() ?? '');
  151. }
  152. }
  153. /// WebSocket关闭连接回调
  154. _webSocketOnDone() {
  155. OLogger.d("onDone , websocket done !!!");
  156. if (_isReconnect) {
  157. _reconnect();
  158. }
  159. }
  160. /// WebSocket连接错误回调
  161. _webSocketOnError(err) {
  162. OLogger.e("onError, websocket error", err);
  163. _status = O2WebsocketStatus.SocketStatusFailed;
  164. _closeSocket(true);
  165. }
  166. /// 初始化心跳
  167. void _initHeartBeat() {
  168. _destroyHeartBeat();
  169. _heartBeat = Timer.periodic(Duration(milliseconds: _heartTimes), (timer) {
  170. _sentHeart();
  171. });
  172. }
  173. /// 心跳
  174. void _sentHeart() {
  175. sendMsg('heartbeat');
  176. }
  177. /// 重连机制
  178. void _reconnect() {
  179. if (_reconnectTimes < _reconnectCount) {
  180. _reconnectTimes++;
  181. _reconnectTimer = Timer.periodic(Duration(milliseconds: _heartTimes), (timer) {
  182. open(reopen: true);
  183. });
  184. } else {
  185. if (_reconnectTimer != null) {
  186. OLogger.i('重连次数超过最大次数');
  187. _reconnectTimer?.cancel();
  188. _reconnectTimer = null;
  189. }
  190. return;
  191. }
  192. }
  193. /// 销毁心跳
  194. void _destroyHeartBeat() {
  195. if (_heartBeat != null) {
  196. _heartBeat?.cancel();
  197. _heartBeat = null;
  198. }
  199. }
  200. /// 关闭WebSocket
  201. void _closeSocket(bool isReconnect) {
  202. if (_channel != null && _status != O2WebsocketStatus.SocketStatusClosed) {
  203. OLogger.i('WebSocket连接关闭');
  204. _isReconnect = isReconnect;
  205. _channel?.sink.close(status.goingAway);
  206. _destroyHeartBeat();
  207. _status = O2WebsocketStatus.SocketStatusClosed;
  208. }
  209. }
  210. }