Warning: error_log(/data/www/wwwroot/hmttv.cn/caches/error_log.php): failed to open stream: Permission denied in /data/www/wwwroot/hmttv.cn/phpcms/libs/functions/global.func.php on line 537 Warning: error_log(/data/www/wwwroot/hmttv.cn/caches/error_log.php): failed to open stream: Permission denied in /data/www/wwwroot/hmttv.cn/phpcms/libs/functions/global.func.php on line 537
文作者芋艿,原題“芋道 Spring Boot WebSocket 入門”,本次有修訂和改動。
WebSocket如今在Web端即時通訊技術應用里使用廣泛,不僅用于傳統PC端的網頁里,也被很多移動端開發者用于基于HTML5的混合APP里。對于想要在基于Web的應用里添加IM、推送等實時通信功能,WebSocket幾乎是必須要掌握的技術。
本文將基于Tomcat和Spring框架實現一個邏輯簡單的入門級IM應用,對于即時通訊初學者來說,能找到一個簡單直接且能順利跑通的實例代碼,顯然意義更大,本文正是如此。希望能給你的IM開發和學習帶來啟發。
注:源碼在本文第四、五節開頭的附件處可下載。
學習交流:
開源IM框架源碼:https://github.com/JackJiang2011/MobileIMSDK
如果你對Web端即時通訊知識一頭霧水,務必先讀:《新手入門貼:史上最全Web端即時通訊技術原理詳解》、《Web端即時通訊技術盤點:短輪詢、Comet、Websocket、SSE》。
限于篇幅,本文不會深究WebSocket技術理論,如有興趣請從基礎學習:
如果想要更硬核一點的,可以讀讀下面這幾篇:
相比 HTTP 協議來說,WebSocket 協議對大多數后端開發者是比較陌生的。
相對而言:WebSocket 協議重點是提供了服務端主動向客戶端發送數據的能力,這樣我們就可以完成實時性較高的需求。例如:聊天 IM 即使通訊功能、消息訂閱服務、網頁游戲等等。
同時:因為 WebSocket 使用 TCP 通信,可以避免重復創建連接,提升通信質量和效率。例如:美團的長連接服務,具體可以看看 《美團點評的移動端網絡優化實踐:大幅提升連接成功率、速度等 》 。
友情提示:
這里有個誤區,WebSocket 相比普通的 Socket 來說,僅僅是借助 HTTP 協議完成握手,創建連接。后續的所有通信,都和 HTTP 協議無關。
看到這里,大家一定以為又要開始嗶嗶 WebSocket 的概念。哈哈,我偏不~如果對這塊不了的朋友,可以閱讀本文“2、知識準備”這一章。
要想使用WebSocket,一般有如下幾種解決方案可選:
目前筆者手頭有個涉及到 IM 即使通訊的項目,采用的是方案三。
主要原因是:我們對 Netty 框架的實戰、原理與源碼,都相對熟悉一些,所以就考慮了它。并且,除了需要支持 WebSocket 協議,我們還想提供原生的 Socket 協議。
如果僅僅是僅僅提供 WebSocket 協議的支持,可以考慮采用方案一或者方案二,在使用上,兩個方案是比較接近的。相比來說,方案一 Spring WebSocket 內置了對 STOMP 協議的支持。
不過:本文還是采用方案二“Tomcat WebSocket”來作為入門示例。咳咳咳,沒有特殊的原因,主要是開始寫本文之前,已經花了 2 小時使用它寫了一個示例。實在是有點懶,不想改。如果能重來,我要選李白,哈哈哈哈~
當然,不要慌,方案一和方案二的實現代碼,真心沒啥差別。
在開始搭建 Tomcat WebSocket 入門示例之前,我們先來了解下 JSR-356 規范,定義了 Java 針對 WebSocket 的 API :即 Javax WebSocket 。規范是大哥,打死不會提供實現,所以 JSR-356 也是如此。目前,主流的 Web 容器都已經提供了 JSR-356 的實現,例如說 Tomcat、Jetty、Undertow 等等。
示例代碼下載:
(因附件無法上傳到此處,請從同步鏈接處下載:http://www.52im.net/thread-3483-1-1.html)
代碼目錄內容是這樣:
在本小節中,我們會使用 Tomcat WebSocket 搭建一個 WebSocket 的示例。
提供如下消息的功能支持:
考慮到讓示例更加易懂,我們先做成全局有且僅有一個大的聊天室,即建立上 WebSocket 的連接,都自動動進入該聊天室。
下面,開始遨游 WebSocket 這個魚塘...
在 pom.xml 文件中,引入相關依賴。
<?xml version="1.0"encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 [url=http://maven.apache.org/xsd/maven-4.0.0.xsd]http://maven.apache.org/xsd/maven-4.0.0.xsd[/url]">
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.10.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>lab-25-01</artifactId>
<dependencies>
<!-- 實現對 WebSocket 相關依賴的引入,方便~ -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<!-- 引入 Fastjson ,實現對 JSON 的序列化,因為后續我們會使用它解析消息 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.62</version>
</dependency>
</dependencies>
</project>
具體每個依賴的作用,自己認真看下注釋。
在 cn.iocoder.springboot.lab25.springwebsocket.websocket 包路徑下,創建 WebsocketServerEndpoint 類,定義 Websocket 服務的端點(EndPoint)。
代碼如下:
// WebsocketServerEndpoint.java
@Controller
@ServerEndpoint("/")
public class WebsocketServerEndpoint {
private Logger logger = LoggerFactory.getLogger(getClass());
@OnOpen
public void onOpen(Session session, EndpointConfig config) {
logger.info("[onOpen][session({}) 接入]", session);
}
@OnMessage
public void onMessage(Session session, String message) {
logger.info("[onOpen][session({}) 接收到一條消息({})]", session, message); // 生產環境下,請設置成 debug 級別
}
@OnClose
public void onClose(Session session, CloseReason closeReason) {
logger.info("[onClose][session({}) 連接關閉。關閉原因是({})}]", session, closeReason);
}
@OnError
public void onError(Session session, Throwable throwable) {
logger.info("[onClose][session({}) 發生異常]", session, throwable);
}
}
如代碼所示:
這是最簡版的 WebsocketServerEndpoint 的代碼。在下文,我們會慢慢把代碼補全。
在 cn.iocoder.springboot.lab24.springwebsocket.config 包路徑下,創建 WebsocketServerEndpoint 配置類。
代碼如下:
// WebSocketConfiguration.java
@Configuration
// @EnableWebSocket // 無需添加該注解,因為我們并不是使用 Spring WebSocket
public class WebSocketConfiguration {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
PS:在 #serverEndpointExporter() 方法中,創建 ServerEndpointExporter Bean 。該 Bean 的作用,是掃描添加有 @ServerEndpoint 注解的 Bean 。
創建 Application.java 類,配置 @SpringBootApplication 注解即可。
代碼如下:
// Application.java
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
執行 Application 啟動該示例項目。
考慮到大家可能不會或者不愿意寫前端代碼,所以我們直接使用 WebSocket在線測試工具,測試 WebSocket 連接。
如下圖:
至此,最簡單的一個 WebSocket 項目的骨架,我們已經搭建完成。下面,我們開始改造,把相應的邏輯補全。
在 HTTP 協議中,是基于 Request/Response 請求響應的同步模型,進行交互。在 Websocket 協議中,是基于 Message 消息的異步模型,進行交互。這一點,是很大的不同的,等會看到具體的消息類,感受會更明顯。
因為 WebSocket 協議,不像 HTTP 協議有 URI 可以區分不同的 API 請求操作,所以我們需要在 WebSocket 的 Message 里,增加能夠標識消息類型,這里我們采用 type 字段。
所以在這個示例中,我們采用的 Message 采用 JSON 格式編碼。
格式如下:
{
type: "", // 消息類型
body: {} // 消息體
}
解釋一下:
實際上:我們在該示例中,body 字段對應的 Message 相關的接口和類,實在想不到名字了。所有的 Message 們,我們都放在 cn.iocoder.springboot.lab25.springwebsocket.message 包路徑下。
4.6.1 Message
創建 Message 接口,基礎消息體,所有消息體都要實現該接口。
代碼如下:
// Message.java
publicinterfaceMessage {
}
目前作為一個標記接口,未定義任何操作。
4.6.2 認證相關 Message
創建 AuthRequest 類,用戶認證請求。
代碼如下:
// AuthRequest.java
public class AuthRequest implements Message {
public static final String TYPE = "AUTH_REQUEST";
/**
* 認證 Token
*/
private String accessToken;
// ... 省略 set/get 方法
}
解釋一下:
對于第2)點,在 WebSocket 協議中,我們也需要認證當前連接,用戶身份是什么。一般情況下,我們采用用戶調用 HTTP 登錄接口,登錄成功后返回的訪問令牌 accessToken 。這里,我們先不拓展開講,事后可以看看 《基于 Token 認證的 WebSocket 連接》 文章。
雖然說,WebSocket 協議是基于 Message 模型,進行交互。但是,這并不意味著它的操作,不需要響應結果。例如說,用戶認證請求,是需要用戶認證響應的。所以,我們創建 AuthResponse 類,作為用戶認證響應。
代碼如下:
// AuthResponse.java
public class AuthResponse implements Message {
public static final String TYPE = "AUTH_RESPONSE";
/**
* 響應狀態碼
*/
private Integer code;
/**
* 響應提示
*/
private String message;
// ... 省略 set/get 方法
}
解釋一下:
對于第1)點,實際上,我們在每個 Message 實現類上,都增加了 TYPE 靜態屬性,作為消息類型。下面,我們就不重復贅述了。
在本示例中,用戶成功認證之后,會廣播用戶加入群聊的通知 Message ,使用 UserJoinNoticeRequest 。
代碼如下:
// UserJoinNoticeRequest.java
public class UserJoinNoticeRequest implements Message {
public static final String TYPE = "USER_JOIN_NOTICE_REQUEST";
/**
* 昵稱
*/
private String nickname;
// ... 省略 set/get 方法
}
實際上,我們可以在需要使用到 Request/Response 模型的地方,將 Message 進行拓展:
這樣,在使用到同步模型的業務場景下,Message 實現類使用 Request/Reponse 作為后綴。例如說,用戶認證請求、刪除一個好友請求等等。
而在使用到異步模型能的業務場景下,Message 實現類還是繼續 Message 作為后綴。例如說,發送一條消息,用戶操作完后,無需阻塞等待結果
4.6.3 發送消息相關 Message
創建 SendToOneRequest 類,發送給指定人的私聊消息的 Message。
代碼如下:
// SendToOneRequest.java
public class SendToOneRequest implements Message {
public static final String TYPE = "SEND_TO_ONE_REQUEST";
/**
* 發送給的用戶
*/
private String toUser;
/**
* 消息編號
*/
private String msgId;
/**
* 內容
*/
private String content;
// ... 省略 set/get 方法
}
每個字段,自己看注釋噢。
創建 SendToAllRequest 類,發送給所有人的群聊消息的 Message。
代碼如下:
// SendToAllRequest.java
public class SendToAllRequest implements Message {
public static final String TYPE = "SEND_TO_ALL_REQUEST";
/**
* 消息編號
*/
private String msgId;
/**
* 內容
*/
private String content;
// ... 省略 set/get 方法
}
每個字段,自己看注釋噢。
在服務端接收到發送消息的請求,需要異步響應發送是否成功。所以,創建 SendResponse 類,發送消息響應結果的 Message 。
代碼如下:
// SendResponse.java
public class SendResponse implements Message {
public static final String TYPE = "SEND_RESPONSE";
/**
* 消息編號
*/
private String msgId;
/**
* 響應狀態碼
*/
private Integer code;
/**
* 響應提示
*/
private String message;
// ... 省略 set/get 方法
}
重點看 msgId 字段:即消息編號。客戶端在發送消息,通過使用 UUID 算法,生成全局唯一消息編號(唯一ID的生成技術見:《從新手到專家:如何設計一套億級消息量的分布式IM系統》的“5、唯一ID的技術方案”章節)。這樣,服務端通過 SendResponse 消息響應,通過 msgId 做映射。
在服務端接收到發送消息的請求,需要轉發消息給對應的人。所以,創建 SendToUserRequest 類,發送消息給一個用戶的 Message 。
代碼如下:
// SendResponse.java
public class SendToUserRequest implements Message {
public static final String TYPE = "SEND_TO_USER_REQUEST";
/**
* 消息編號
*/
private String msgId;
/**
* 內容
*/
private String content;
// ... 省略 set/get 方法
}
相比 SendToOneRequest 來說,少一個 toUser 字段。因為,我們可以通過 WebSocket 連接,已經知道發送給誰了。
每個客戶端發起的 Message 消息類型,我們會聲明對應的 MessageHandler 消息處理器。這個就類似在 SpringMVC 中,每個 API 接口對應一個 Controller 的 Method 方法。
所有的 MessageHandler 們,我們都放在 cn.iocoder.springboot.lab25.springwebsocket.handler 包路徑下。
4.7.1 MessageHandler
創建 MessageHandler 接口,消息處理器接口。
代碼如下:
// MessageHandler.java
public interface MessageHandler<T extends Message> {
/**
* 執行處理消息
*
* @param session 會話
* @param message 消息
*/
void execute(Session session, T message);
/**
* @return 消息類型,即每個 Message 實現類上的 TYPE 靜態字段
*/
String getType();
}
解釋一下:
4.7.2 AuthMessageHandler
創建 AuthMessageHandler 類,處理 AuthRequest 消息。
代碼如下:
// AuthMessageHandler.java
@Component
public class AuthMessageHandler implements MessageHandler<AuthRequest> {
@Override
public void execute(Session session, AuthRequest message) {
// 如果未傳遞 accessToken
if(StringUtils.isEmpty(message.getAccessToken())) {
WebSocketUtil.send(session, AuthResponse.TYPE,
new AuthResponse().setCode(1).setMessage("認證 accessToken 未傳入"));
return;
}
// 添加到 WebSocketUtil 中
WebSocketUtil.addSession(session, message.getAccessToken()); // 考慮到代碼簡化,我們先直接使用 accessToken 作為 User
// 判斷是否認證成功。這里,假裝直接成功
WebSocketUtil.send(session, AuthResponse.TYPE,newAuthResponse().setCode(0));
// 通知所有人,某個人加入了。這個是可選邏輯,僅僅是為了演示
WebSocketUtil.broadcast(UserJoinNoticeRequest.TYPE,
newUserJoinNoticeRequest().setNickname(message.getAccessToken())); // 考慮到代碼簡化,我們先直接使用 accessToken 作為 User
}
@Override
public String getType() {
return AuthRequest.TYPE;
}
}
代碼比較簡單,跟著代碼讀讀即可。
關于 WebSocketUtil 類,我們在「5.8、WebSocketUtil」一節中再來詳細看看。
4.7.3 SendToOneRequest
創建 SendToOneHandler 類,處理 SendToOneRequest 消息。
代碼如下:
// SendToOneRequest.java
@Component
public class SendToOneHandler implements MessageHandler<SendToOneRequest> {
@Override
public void execute(Session session, SendToOneRequest message) {
// 這里,假裝直接成功
SendResponse sendResponse = newSendResponse().setMsgId(message.getMsgId()).setCode(0);
WebSocketUtil.send(session, SendResponse.TYPE, sendResponse);
// 創建轉發的消息
SendToUserRequest sendToUserRequest = newSendToUserRequest().setMsgId(message.getMsgId())
.setContent(message.getContent());
// 廣播發送
WebSocketUtil.send(message.getToUser(), SendToUserRequest.TYPE, sendToUserRequest);
}
@Override
public String getType() {
return SendToOneRequest.TYPE;
}
}
代碼比較簡單,跟著代碼讀讀即可。
4.7.4 SendToAllHandler
創建 SendToAllHandler 類,處理 SendToAllRequest 消息。
代碼如下:
// SendToAllRequest.java
@Component
public class SendToAllHandler implements MessageHandler<SendToAllRequest> {
@Override
public void execute(Session session, SendToAllRequest message) {
// 這里,假裝直接成功
SendResponse sendResponse = newSendResponse().setMsgId(message.getMsgId()).setCode(0);
WebSocketUtil.send(session, SendResponse.TYPE, sendResponse);
// 創建轉發的消息
SendToUserRequest sendToUserRequest = newSendToUserRequest().setMsgId(message.getMsgId())
.setContent(message.getContent());
// 廣播發送
WebSocketUtil.broadcast(SendToUserRequest.TYPE, sendToUserRequest);
}
@Override
public String getType() {
return SendToAllRequest.TYPE;
}
}
代碼比較簡單,跟著代碼讀讀即可。
代碼在 cn.iocoder.springboot.lab25.springwebsocket.util 包路徑下。
創建 WebSocketUtil 工具類,主要提供兩方面的功能:
整體代碼比較簡單,自己瞅瞅喲。
代碼在目錄中的如下位置:
在本小節,我們會修改 WebsocketServerEndpoint 的代碼,完善其功能。
4.9.1 初始化 MessageHandler 集合
實現 InitializingBean 接口,在 #afterPropertiesSet() 方法中,掃描所有 MessageHandler Bean ,添加到 MessageHandler 集合中。
代碼如下:
// WebsocketServerEndpoint.java
/**
* 消息類型與 MessageHandler 的映射
*
* 注意,這里設置成靜態變量。雖然說 WebsocketServerEndpoint 是單例,但是 Spring Boot 還是會為每個 WebSocket 創建一個 WebsocketServerEndpoint Bean 。
*/
private static final Map<String, MessageHandler> HANDLERS = newHashMap<>();
@Autowired
private ApplicationContext applicationContext;
@Override
public void afterPropertiesSet() throws Exception {
// 通過 ApplicationContext 獲得所有 MessageHandler Bean
applicationContext.getBeansOfType(MessageHandler.class).values() // 獲得所有 MessageHandler Bean.forEach(messageHandler -> HANDLERS.put(messageHandler.getType(), messageHandler)); // 添加到 handlers 中
logger.info("[afterPropertiesSet][消息處理器數量:{}]", HANDLERS.size());
}
通過這樣的方式,可以避免手動配置 MessageHandler 與消息類型的映射。
4.9.2 onOpen
重新實現 #onOpen(Session session, EndpointConfig config) 方法,實現連接時,使用 accessToken 參數進行用戶認證。
代碼如下:
// WebsocketServerEndpoint.java
@OnOpen
public void onOpen(Session session, EndpointConfig config) {
logger.info("[onOpen][session({}) 接入]", session);
// <1> 解析 accessToken
List<String> accessTokenValues = session.getRequestParameterMap().get("accessToken");
String accessToken = !CollectionUtils.isEmpty(accessTokenValues) ? accessTokenValues.get(0) : null;
// <2> 創建 AuthRequest 消息類型
AuthRequest authRequest = newAuthRequest().setAccessToken(accessToken);
// <3> 獲得消息處理器
MessageHandler<AuthRequest> messageHandler = HANDLERS.get(AuthRequest.TYPE);
if(messageHandler == null) {
logger.error("[onOpen][認證消息類型,不存在消息處理器]");
return;
}
messageHandler.execute(session, authRequest);
}
如代碼所示:
打開三個瀏覽器創建,分別設置服務地址如下:
然后,逐個點擊「開啟連接」按鈕,進行 WebSocket 連接。
最終效果如下圖:
如上圖所示:
4.9.3 onMessage
重新實現 #onMessage(Session session, String message) 方法,實現不同的消息,轉發給不同的 MessageHandler 消息處理器。
代碼如下:
// WebsocketServerEndpoint.java
@OnMessage
public void onMessage(Session session, String message) {
logger.info("[onOpen][session({}) 接收到一條消息({})]", session, message); // 生產環境下,請設置成 debug 級別
try{
// <1> 獲得消息類型
JSONObject jsonMessage = JSON.parseObject(message);
String messageType = jsonMessage.getString("type");
// <2> 獲得消息處理器
MessageHandler messageHandler = HANDLERS.get(messageType);
if(messageHandler == null) {
logger.error("[onMessage][消息類型({}) 不存在消息處理器]", messageType);
return;
}
// <3> 解析消息
Class<? extendsMessage> messageClass = this.getMessageClass(messageHandler);
// <4> 處理消息
Message messageObj = JSON.parseObject(jsonMessage.getString("body"), messageClass);
messageHandler.execute(session, messageObj);
} catch(Throwable throwable) {
logger.info("[onMessage][session({}) message({}) 發生異常]", session, throwable);
}
}
代碼中:
代碼如下:
// WebsocketServerEndpoint.java
private Class<? extends Message> getMessageClass(MessageHandler handler) {
// 獲得 Bean 對應的 Class 類名。因為有可能被 AOP 代理過。
Class<?> targetClass = AopProxyUtils.ultimateTargetClass(handler);
// 獲得接口的 Type 數組
Type[] interfaces = targetClass.getGenericInterfaces();
Class<?> superclass = targetClass.getSuperclass();
while((Objects.isNull(interfaces) || 0== interfaces.length) && Objects.nonNull(superclass)) { // 此處,是以父類的接口為準
interfaces = superclass.getGenericInterfaces();
superclass = targetClass.getSuperclass();
}
if(Objects.nonNull(interfaces)) {
// 遍歷 interfaces 數組
for(Type type : interfaces) {
// 要求 type 是泛型參數
if(type instanceof ParameterizedType) {
ParameterizedType parameterizedType = (ParameterizedType) type;
// 要求是 MessageHandler 接口
if(Objects.equals(parameterizedType.getRawType(), MessageHandler.class)) {
Type[] actualTypeArguments = parameterizedType.getActualTypeArguments();
// 取首個元素
if(Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) {
return(Class<Message>) actualTypeArguments[0];
} else{
thrownewIllegalStateException(String.format("類型(%s) 獲得不到消息類型", handler));
}
}
}
}
}
throw new IllegalStateException(String.format("類型(%s) 獲得不到消息類型", handler));
}
這是參考 rocketmq-spring 項目的 DefaultRocketMQListenerContainer#getMessageType() 方法,進行略微修改。
如果大家對 Java 的泛型機制沒有做過一點了解,可能略微有點硬核。可以先暫時跳過,知道意圖即可。
<4> 處,調用 MessageHandler#execute(session, message) 方法,執行處理請求。
另外:這里增加了 try-catch 代碼,避免整個執行的過程中,發生異常。如果在 onMessage 事件的處理中,發生異常,該消息對應的 Session 會話會被自動關閉。顯然,這個不符合我們的要求。例如說,在 MessageHandler 處理消息的過程中,發生一些異常是無法避免的。
繼續基于上述創建的三個瀏覽器,我們先點擊「清空消息」按鈕,清空下消息,打掃下上次測試展示出來的接收得到的 Message 。當然,WebSocket 的連接,不需要去斷開。
在第一個瀏覽器中,分別發送兩種聊天消息。
一條 SendToOneRequest 私聊消息:
{
type: "SEND_TO_ONE_REQUEST",
body: {
toUser: "番茄",
msgId: "eaef4a3c-35dd-46ee-b548-f9c4eb6396fe",
content: "我是一條單聊消息"
}
}
一條 SendToAllHandler 群聊消息:
{
type: "SEND_TO_ALL_REQUEST",
body: {
msgId: "838e97e1-6ae9-40f9-99c3-f7127ed64747",
content: "我是一條群聊消息"
}
}
最終結果如下圖:
如上圖所示:
4.9.4 onClose
重新實現 #onClose(Session session, CloseReason closeReason) 方法,實現移除關閉的 Session 。
代碼如下:
// WebsocketServerEndpoint.java
@OnClose
public void onClose(Session session, CloseReason closeReason) {
logger.info("[onClose][session({}) 連接關閉。關閉原因是({})}]", session, closeReason);
WebSocketUtil.removeSession(session);
}
4.9.5 onError
#onError(Session session, Throwable throwable) 方法,保持不變。
代碼如下:
// WebsocketServerEndpoint.java
@OnError
public void onError(Session session, Throwable throwable) {
logger.info("[onClose][session({}) 發生異常]", session, throwable);
}
示例代碼下載:
(因附件無法上傳到此處,請從同步鏈接處下載:http://www.52im.net/thread-3483-1-1.html)
仔細一個捉摸,虎軀一震,還是提供一個 Spring WebSocket 快速入門的示例。
在 上章「Tomcat WebSocket 實戰入門」 的 lab-websocket-25-01 示例的基礎上,我們復制出 lab-websocket-25-02 項目,進行改造。
改造的代碼目錄內容是這樣:
因為 Tomcat WebSocket 使用的是 Session 作為會話,而 Spring WebSocket 使用的是 WebSocketSession 作為會話,導致我們需要略微修改下 WebSocketUtil 工具類。改動非常略微,點擊 WebSocketUtil.java 查看下,秒懂的噢。
主要有兩點:
將 cn.iocoder.springboot.lab25.springwebsocket.handler 包路徑下的消息處理器們,使用到 Session 類的地方,調整成 WebSocketSession 類。
在 cn.iocoder.springboot.lab25.springwebsocket.websocket 包路徑下,創建 DemoWebSocketShakeInterceptor 攔截器。因為 WebSocketSession 無法獲得 ws 地址上的請求參數,所以只好通過該攔截器,獲得 accessToken 請求參數,設置到 attributes 中。
代碼如下:
// DemoWebSocketShakeInterceptor.java
public class DemoWebSocketShakeInterceptor extends HttpSessionHandshakeInterceptor {
@Override// 攔截 Handshake 事件
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response,WebSocketHandler wsHandler, Map<String, Object> attributes) throwsException {
// 獲得 accessToken
if(request instanceof ServletServerHttpRequest) {
ServletServerHttpRequest serverRequest = (ServletServerHttpRequest) request;
attributes.put("accessToken", serverRequest.getServletRequest().getParameter("accessToken"));
}
// 調用父方法,繼續執行邏輯
return super.beforeHandshake(request, response, wsHandler, attributes);
}
}
在 cn.iocoder.springboot.lab25.springwebsocket.websocket 包路徑下,創建 DemoWebSocketHandler 處理器。該處理器參考 「5.9、完善 WebsocketServerEndpoint」 小節,編寫它的代碼。
DemoWebSocketHandler.java代碼位于如下目錄處,具體內容就不貼出來了,自已去讀一讀:
代碼極其相似,簡單擼下即可。
修改 WebSocketConfiguration 配置類,代碼如下:
// WebSocketConfiguration.java
@Configuration
@EnableWebSocket// 開啟 Spring WebSocket
public class WebSocketConfiguration implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(this.webSocketHandler(), "/") // 配置處理器
.addInterceptors(newDemoWebSocketShakeInterceptor()) // 配置攔截器
.setAllowedOrigins("*"); // 解決跨域問題
}
@Bean
public DemoWebSocketHandler webSocketHandler() {
return new DemoWebSocketHandler();
}
@Bean
public DemoWebSocketShakeInterceptor webSocketShakeInterceptor() {
return new DemoWebSocketShakeInterceptor();
}
}
解釋一下:
至此,我們已經完成 Spring WebSocket 的示例。
后面,我們執行 Application 來啟動項目。具體的測試,這里就不重復了,可以自己使用 WebSocket 在線測試工具 來測試下。
雖然說,WebSocket 協議已經在主流的瀏覽器上,得到非常好的支持,但是總有一些“異類”,是不兼容的。所以就誕生了 SockJS、Socket.io這類庫。關于它們的介紹與使用,可以看看 《SockJS 簡單介紹》 、《Web端即時通訊技術的發展與WebSocket、Socket.io的技術實踐》文章。
實際場景下,我們在使用 WebSocket 還是原生 Socket 也好,都需要考慮“如何保證消息一定送達給用戶?”
大家肯定能夠想到的是:如果用戶不處于在線的時候,消息持久化到 MySQL、MongoDB 等等數據庫中。這個是正確,且是必須要做的。
我們在一起考慮下邊界場景:客戶端網絡環境較差,特別是在移動端場景下,出現網絡閃斷,可能會出現連接實際已經斷開,而服務端以為客戶端處于在線的情況。此時,服務端會將消息發給客戶端,那么消息實際就發送到“空氣”中,產生丟失的情況。
要解決這種情況下的問題,需要引入客戶端的 ACK 消息機制。
目前,主流的有兩種做法。
第一種:基于每一條消息編號 ACK
整體流程如下:
這種方案,因為客戶端逐條 ACK 消息編號,所以會導致客戶端和服務端交互次數過多。當然,客戶端可以異步批量 ACK 多條消息,從而減少次數。
不過因為服務端仍然需要定時輪詢,也會導致服務端壓力較大。所以,這種方案基本已經不采用了。
第二種:基于滑動窗口 ACK
整體流程如下:
這種方式,在業務被稱為推拉結合的方案,在分布式消息隊列、配置中心、注冊中心實現實時的數據同步,經常被采用。
并且,采用這種方案的情況下,客戶端和服務端不一定需要使用長連接,也可以使用長輪詢所替代。
做法比如,客戶端發送帶有消息版本號的 HTTP 請求到服務端:
如果大家對消息可靠投遞這塊感興趣,可以看看下面這幾篇:
畢竟,本篇這里寫的有點簡略哈 ~
最后:如果你想系統的學習IM開發方面方面的知識,推薦詳讀:《新手入門一篇就夠:從零開發移動端IM》。如果你自認為已經有點小牛x了,可以看看生產環境下的大用戶量IM系統架構設計方面的知識:《從新手到專家:如何設計一套億級消息量的分布式IM系統 》。
限于篇幅,這里就不再繼續展開了。
《自已開發IM有那么難嗎?手把手教你自擼一個Andriod版簡易IM (有源碼)》
《一種Android端IM智能心跳算法的設計與實現探討(含樣例代碼)》
《手把手教你用Netty實現網絡通信程序的心跳機制、斷線重連機制》
《輕量級即時通訊框架MobileIMSDK的iOS源碼(開源版)[附件下載]》
《開源IM工程“蘑菇街TeamTalk”2015年5月前未刪減版完整代碼 [附件下載]》
《NIO框架入門(一):服務端基于Netty4的UDP雙向通信Demo演示 [附件下載]》
《NIO框架入門(二):服務端基于MINA2的UDP雙向通信Demo演示 [附件下載]》
《NIO框架入門(三):iOS與MINA2、Netty4的跨平臺UDP雙向通信實戰 [附件下載]》
《NIO框架入門(四):Android與MINA2、Netty4的跨平臺UDP雙向通信實戰 [附件下載]》
《一個WebSocket實時聊天室Demo:基于node.js+socket.io [附件下載]》
《適合新手:從零開發一個IM服務端(基于Netty,有完整源碼)》
《拿起鍵盤就是干:跟我一起徒手開發一套分布式IM系統》
《正確理解IM長連接的心跳及重連機制,并動手實現(有完整IM源碼)》
《適合新手:手把手教你用Go快速搭建高性能、可擴展的IM系統(有源碼)》
《跟著源碼一起學:手把手教你用WebSocket打造Web端IM聊天》
本文已同步發布于“即時通訊技術圈”公眾號。
同步發布鏈接是:http://www.52im.net/thread-3483-1-1.html
本我是準備接著寫我那個多進程教程的,今天心血來潮想看看swoole的websocket,
我就點開了這個
WebSocket
我看了看官網的demo,覺得看起來很簡單嘛,
<?php
//官網demo
$server = new swoole_websocket_server("0.0.0.0", 9501);
$server->on('open', function (swoole_websocket_server $server, $request) {
echo "server: handshake success with fd{$request->fd}\n";//$request->fd 是客戶端id
});
$server->on('message', function (swoole_websocket_server $server, $frame) {
echo "receive from {$frame->fd}:{$frame->data},opcode:{$frame->opcode},fin:{$frame->finish}\n";
$server->push($frame->fd, "this is server");//$frame->fd 是客戶端id,$frame->data是客戶端發送的數據
//服務端向客戶端發送數據是用 $server->push( '客戶端id' , '內容')
});
$server->on('close', function ($ser, $fd) {
echo "client {$fd} closed\n";
});
$server->start();
我就是喜歡這種簡單易懂的demo ,每行代碼意思一看就明白
服務端有了,我找點客戶端的js代碼
火狐的MDN
<!DOCTYPE html>
<html>
<head>
<title></title>
<meta charset="UTF-8">
<script type="text/javascript">
var exampleSocket = new WebSocket("ws://0.0.0.0:9501");
exampleSocket.onopen = function (event) {
exampleSocket.send("親愛的服務器!我連上你啦!");
};
exampleSocket.onmessage = function (event) {
console.log(event.data);
}
</script>
</head>
<body>
<input type="text" id="content">
<button onclick="exampleSocket.send( document.getElementById('content').value )">發送</button>
</body>
</html>
最后命令行運行php文件,之后瀏覽器打開html文件,
F12打開調試界面看console,ok , 沒有問題
這個時候我突然想到一個事情,因為我做多進程的那個教程里,在主進程中會將所有的子進程的句柄存起來,以后進行進程間通訊用。
那么 我將所有的客戶端的鏈接存起來存成數組,每當一個客戶端發送消息時,我就遍歷這個客戶端數組,將消息群發一遍,不久實現了聊天室了嗎?
然后就,服務端代碼成了這個樣子
<?php
$map = array();//客戶端集合
$server = new swoole_websocket_server("0.0.0.0", 9501);
$server->on('open', function (swoole_websocket_server $server, $request) {
global $map;//客戶端集合
$map[$request->fd] = $request->fd;//首次連上時存起來
});
$server->on('message', function (swoole_websocket_server $server, $frame) {
global $map;//客戶端集合
$data = $frame->data;
foreach($map as $fd){
$server->push($fd , $data);//循環廣播
}
});
$server->on('close', function ($ser, $fd) {
echo "client {$fd} closed\n";
});
$server->start();
哈哈 , 我覺得這樣就大功告成了,結果發現自己是 圖樣圖森破
大家可以自己試試,運行php后 , 瀏覽器打開兩個頁面,看看console.log的內容是什么
運行良好,可是并沒有實現我們說的那種聊天效果。
找找原因吧。
我第一反映看看$map里面是什么,就輸出看看,結果發現這個map里面只有一個元素。
唉,不對啊,我這是全局變量,難道不應該是有幾個客戶端鏈接,就有幾個元素嗎?
這是怎么回事啊,竟然沒有保存到所有客戶端id?
到了這一步,我解決不了map變量的這個問題了,然后我就想看看那個fd是什么東西,
老規矩 var_dump輸出 , 發現fd就是 int類型的數字,并且是自增的
這好辦了,不就是數字嘛
于是呼,我就這樣做
變量存不了,我搞不定,我存文本里嘛。
最終版 websocket.php
<?php
$server = new swoole_websocket_server("0.0.0.0", 9501);
$server->on('open', function (swoole_websocket_server $server, $request) {
file_put_contents( __DIR__ .'/log.txt' , $request->fd);
});
$server->on('message', function (swoole_websocket_server $server, $frame) {
global $client;
$data = $frame->data;
$m = file_get_contents( __DIR__ .'/log.txt');
for ($i=1 ; $i<= $m ; $i++) {
echo PHP_EOL . ' i is ' . $i . ' data is '.$data . ' m = ' . $m;
$server->push($i, $data );
}
});
$server->on('close', function ($ser, $fd) {
echo "client {$fd} closed\n";
});
$server->start();
再次打開html文件,多個頁面進行輸入觀察,ok,可以了。
當然,作為聊天室,我這寫的也過于簡陋了,界面大家自己可以寫的好看一些(因為我懶的寫界面)
還有,每次的發送聊天的記錄,應該存起來,這樣,如果有新的連接連過來的時候,先把以前的聊天記錄發過去,這樣,我想體驗更好一些
然后,大家可以愉快的聊天了。哈哈
在“疫情”期間已經淘汰了一批末端的業務coder,現在是自己努力成為資深程序員的好時機,才能在面對高薪職位邀請時,做到胸有成竹。為了大家能夠順利進階PHP中高級程序員、架構師,我為大家準備了一份中高級的教程福利!
作為web開發的佼佼者PHP并不遜色其他語言,加上swoole后更加是如虎添翼!進軍通信 、物聯網行業開發百度地圖、百度訂單中心等!年后更是霸占程序員招聘語言第二名,疫情裁員期過后正是各大企業擴大招人的時期,現在市場初級程序員泛濫,進階中高級程序員絕對是各大企業急需的人才,這套教程適合那些1-6年的PHP開發者進階中高級提升自己,在春招中找到高薪職位!
者:五月君
轉發鏈接:https://mp.weixin.qq.com/s/TLKkRwftewa0uMvIRlOf5g
*請認真填寫需求信息,我們會在24小時內與您取得聯系。