gmnon.cn-疯狂蹂躏欧美一区二区精品,欧美精品久久久久a,高清在线视频日韩欧美,日韩免费av一区二区

站長資訊網
最全最豐富的資訊網站

基于Java NIO的即時聊天服務器模型

折騰了一個周,終于搞出來了一個雛形,相比于xmpp的xml,本人更喜歡json的簡潔,為了防止客戶端異常斷開等,準備采用心跳檢測的機制來判斷用戶是否在線,另外還有一種方法是學習例如Tomcat等Servlet中間件的方式,設置Session周期,定時清除過期Session。

    前不久自己動手寫了一個Android的聊天工具,跟服務器的交互還是基于HTTP方式的,在一般通訊上還算湊活,但是在即時聊天的時候就有點惡心了,客戶端開啟Service每隔3秒去詢問服務器是否有自己的新消息(當然3秒有點太快了),在心疼性能和流量的前提下,只能自己動手寫個服務器,傳統的Socket是阻塞的,這樣的話服務器對每個Socket都需要建立一個線程來操作,資源開銷很大,而且線程多了直接會影響服務端的性能(曾經測試開了3000多個線程就不讓創建了,所以并發數目也是有限制的),聽說從JDK1.5就多了個New
    IO,灰常不錯的樣子,找了找相關的資料,網上竟然全都是最最最簡單的一個demo,然后去CSDN發帖,基本上都是建議直接使用MINA框架的,這樣一來根本達不到學習NIO的目的,而且現在的技術也太快餐了,只知道使用前輩留下的東西,知其然不知其所以然。

    折騰了一個周,終于搞出來了一個雛形,相比于xmpp的xml,本人更喜歡json的簡潔,為了防止客戶端異常斷開等,準備采用心跳檢測的機制來判斷用戶是否在線,另外還有一種方法是學習例如Tomcat等Servlet中間件的方式,設置Session周期,定時清除過期Session。本Demo暫時實現了Session過期檢測,心跳檢測有空再搞,如果本例子在使用過程中有性能漏洞或者什么bug請及時通知我,謝謝。

    廢話不多說,關于NIO的SelectionKey、Selector、Channel網上的介紹例子都很多,直接上代碼:

    JsonParser

    Json的解析類,隨便封裝了下,使用的最近比較火的fastjson

                     
    1. public class JsonParser {
    2. private static JSONObject mJson;
    3. public synchronized static String get(String json,String key) {
    4. mJson = JSON.parseObject(json);
    5. return mJson.getString(key);
    6. }
    7. }
     

    Main

    入口,不解釋

                                                                                                     
    1. public class Main {
    2. public static void main(String… args) {
    3. new SeekServer().start();
    4. }
    5. }
     

    Log

                                                                                                                                                             
    1. public class Log {
    2. public static void i(Object obj) {
    3. System.out.println(obj);
    4. }
    5. public static void e(Object e) {
    6. System.err.println(e);
    7. }
    8. }
     

    SeekServer:

    服務器端的入口,請求的封裝和接收都在此類,端口暫時寫死在了代碼里,mSelector.select(TIME_OUT) > 0
    目的是為了當服務器空閑的時候(沒有任何讀寫甚至請求斷開事件),循環時有個間隔時間,不然基本上相當于while(true){//nothing}了,你懂的。

                                                                                                                                                                                                                                             
    1. public class SeekServer extends Thread{
    2. private final int ACCPET_PORT = 55555;
    3. private final int TIME_OUT = 1000;
    4. private Selector mSelector = null;
    5. private ServerSocketChannel mSocketChannel = null;
    6. private ServerSocket mServerSocket = null;
    7. private InetSocketAddress mAddress = null;
    8. public SeekServer() {
    9. long sign = System.currentTimeMillis();
    10. try {
    11. mSocketChannel = ServerSocketChannel.open();
    12. if(mSocketChannel == null) {
    13. System.out.println(“can’t open server socket channel”);
    14. }
    15. mServerSocket = mSocketChannel.socket();
    16. mAddress = new InetSocketAddress(ACCPET_PORT);
    17. mServerSocket.bind(mAddress);
    18. Log.i(“server bind port is “ + ACCPET_PORT);
    19. mSelector = Selector.open();
    20. mSocketChannel.configureBlocking(false);
    21. SelectionKey key = mSocketChannel.register(mSelector, SelectionKey.OP_ACCEPT);
    22. key.attach(new Acceptor());
    23. //檢測Session狀態
    24. Looper.getInstance().loop();
    25. //開始處理Session
    26. SessionProcessor.start();
    27. Log.i(“Seek server startup in “ + (System.currentTimeMillis() – sign) + “ms!”);
    28. } catch (ClosedChannelException e) {
    29. Log.e(e.getMessage());
    30. } catch (IOException e) {
    31. Log.e(e.getMessage());
    32. }
    33. }
    34. public void run() {
    35. Log.i(“server is listening…”);
    36. while(!Thread.interrupted()) {
    37. try {
    38. if(mSelector.select(TIME_OUT) > 0) {
    39. Set<SelectionKey> keys = mSelector.selectedKeys();
    40. Iterator<SelectionKey> iterator = keys.iterator();
    41. SelectionKey key = null;
    42. while(iterator.hasNext()) {
    43. key = iterator.next();
    44. Handler at = (Handler) key.attachment();
    45. if(at != null) {
    46. at.exec();
    47. }
    48. iterator.remove();
    49. }
    50. }
    51. } catch (IOException e) {
    52. Log.e(e.getMessage());
    53. }
    54. }
    55. }
    56. class Acceptor extends Handler{
    57. public void exec(){
    58. try {
    59. SocketChannel sc = mSocketChannel.accept();
    60. new Session(sc, mSelector);
    61. } catch (ClosedChannelException e) {
    62. Log.e(e);
    63. } catch (IOException e) {
    64. Log.e(e);
    65. }
    66. }
    67. }
    68. }
     

    Handler:

    只有一個抽象方法exec,Session將會繼承它。


    1. public abstract class Handler {
    2. public abstract void exec();
    3. }
     

    Session:

    封裝了用戶的請求和SelectionKey和SocketChannel,每次接收到新的請求時都重置它的最后活動時間,通過狀態mState=READING
    or SENDING 去執行消息的接收與發送,當客戶端異常斷開時則從SessionManager清除該會話。


    1. public class Session extends Handler{
    2. private SocketChannel mChannel;
    3. private SelectionKey mKey;
    4. private ByteBuffer mRreceiveBuffer = ByteBuffer.allocate(10240);
    5. private Charset charset = Charset.forName(“UTF-8”);
    6. private CharsetDecoder mDecoder = charset.newDecoder();
    7. private CharsetEncoder mEncoder = charset.newEncoder();
    8. private long lastPant;//最后活動時間
    9. private final int TIME_OUT = 1000 * 60 * 5; //Session超時時間
    10. private String key;
    11. private String sendData = “”;
    12. private String receiveData = null;
    13. public static final int READING = 0,SENDING = 1;
    14. int mState = READING;
    15. public Session(SocketChannel socket, Selector selector) throws IOException {
    16. this.mChannel = socket;
    17. mChannel = socket;
    18. mChannel.configureBlocking(false);
    19. mKey = mChannel.register(selector, 0);
    20. mKey.attach(this);
    21. mKey.interestOps(SelectionKey.OP_READ);
    22. selector.wakeup();
    23. lastPant = Calendar.getInstance().getTimeInMillis();
    24. }
    25. public String getReceiveData() {
    26. return receiveData;
    27. }
    28. public void clear() {
    29. receiveData = null;
    30. }
    31. public void setSendData(String sendData) {
    32. mState = SENDING;
    33. mKey.interestOps(SelectionKey.OP_WRITE);
    34. this.sendData = sendData + “n”;
    35. }
    36. public boolean isKeekAlive() {
    37. return lastPant + TIME_OUT > Calendar.getInstance().getTimeInMillis();
    38. }
    39. public void setAlive() {
    40. lastPant = Calendar.getInstance().getTimeInMillis();
    41. }
    42. /**
    43. * 注銷當前Session
    44. */
    45. public void distroy() {
    46. try {
    47. mChannel.close();
    48. mKey.cancel();
    49. } catch (IOException e) {}
    50. }
    51. @Override
    52. public synchronized void exec() {
    53. try {
    54. if(mState == READING) {
    55. read();
    56. }else if(mState == SENDING) {
    57. write();
    58. }
    59. } catch (IOException e) {
    60. SessionManager.remove(key);
    61. try {
    62. mChannel.close();
    63. } catch (IOException e1) {
    64. Log.e(e1);
    65. }
    66. mKey.cancel();
    67. }
    68. }
    69. public void read() throws IOException{
    70. mRreceiveBuffer.clear();
    71. int sign = mChannel.read(mRreceiveBuffer);
    72. if(sign == –1) { //客戶端連接關閉
    73. mChannel.close();
    74. mKey.cancel();
    75. }
    76. if(sign > 0) {
    77. mRreceiveBuffer.flip();
    78. receiveData = mDecoder.decode(mRreceiveBuffer).toString();
    79. setAlive();
    80. setSign();
    81. SessionManager.addSession(key, this);
    82. }
    83. }
    84. private void setSign() {
    85. //設置當前Session的Key
    86. key = JsonParser.get(receiveData,“imei”);
    87. //檢測消息類型是否為心跳包
    88. // String type = jo.getString(“type”);
    89. // if(type.equals(“HEART_BEAT”)) {
    90. // setAlive();
    91. // }
    92. }
    93. /**
    94. * 寫消息
    95. */
    96. public void write() {
    97. try {
    98. mChannel.write(mEncoder.encode(CharBuffer.wrap(sendData)));
    99. sendData = null;
    100. mState = READING;
    101. mKey.interestOps(SelectionKey.OP_READ);
    102. } catch (CharacterCodingException e) {
    103. e.printStackTrace();
    104. } catch (IOException e) {
    105. try {
    106. mChannel.close();
    107. } catch (IOException e1) {
    108. Log.e(e1);
    109. }
    110. }
    111. }
    112. }
     

    SessionManager:

    將所有Session存放到ConcurrentHashMap,這里使用手機用戶的imei做key,ConcurrentHashMap因為是線程安全的,所以能很大程度上避免自己去實現同步的過程,
    封裝了一些操作Session的方法例如get,remove等。


    1. public class SessionManager {
    2. private static ConcurrentHashMap<String, Session> sessions = new ConcurrentHashMap<String, Session>();
    3. public static void addSession(String key,Session session) {
    4. sessions.put(key, session);
    5. }
    6. public static Session getSession(String key) {
    7. return sessions.get(key);
    8. }
    9. public static Set<String> getSessionKeys() {
    10. return sessions.keySet();
    11. }
    12. public static int getSessionCount() {
    13. return sessions.size();
    14. }
    15. public static void remove(String[] keys) {
    16. for(String key:keys) {
    17. if(sessions.containsKey(key)) {
    18. sessions.get(key).distroy();
    19. sessions.remove(key);
    20. }
    21. }
    22. }
    23. public static void remove(String key) {
    24. if(sessions.containsKey(key)) {
    25. sessions.get(key).distroy();
    26. sessions.remove(key);
    27. }
    28. }
    29. }
     

    SessionProcessor

    里面使用了JDK自帶的線程池,用來分發處理所有Session中當前需要處理的請求(線程池的初始化參數不是太熟,望有了解的童鞋能告訴我),內部類Process則是將Session再次封裝成SocketRequest和SocketResponse(看到這里是不是有點熟悉的感覺,對沒錯,JavaWeb里到處都是request和response)。

                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     
    1. public class SessionProcessor implements Runnable{
    2. private static Runnable processor = new SessionProcessor();
    3. private static ThreadPoolExecutor pool = new ThreadPoolExecutor(10, 200, 500, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<Runnable>(10),new ThreadPoolExecutor.CallerRunsPolicy());
    4. public static void start() {
    5. new Thread(processor).start();
    6. }
    7. @Override
    8. public void run() {
    9. while(true) {
    10. Session tmp = null;
    11. for(String key:SessionManager.getSessionKeys()) {
    12. tmp = SessionManager.getSession(key);
    13. //處理Session未處理的請求
    14. if(tmp.getReceiveData() != null) {
    15. pool.execute(new Process(tmp));
    16. }
    17. }
    18. try {
    19. Thread.sleep(10);
    20. } catch (InterruptedException e) {
    21. Log.e(e);
    22. }
    23. }
    24. }
    25. class Process implements Runnable {
    26. private SocketRequest request;
    27. private SocketResponse response;
    28. public Process(Session session) {
    29. //將Session封裝成Request和Response
    30. request = new SocketRequest(session);
    31. response = new SocketResponse(session);
    32. }
    33. @Override
    34. public void run() {
    35. new RequestTransform().transfer(request, response);
    36. }
    37. }
    38. }
     

    RequestTransform里的transfer方法利用反射對請求參數中的請求類別和請求動作來調用不同類的不同方法(UserHandler和MessageHandler)


    1. public class RequestTransform {
    2. public void transfer(SocketRequest request,SocketResponse response) {
    3. String action = request.getValue(“action”);
    4. String handlerName = request.getValue(“handler”);
    5. //根據Session的請求類型,讓不同的類方法去處理
    6. try {
    7. Class<?> c= Class.forName(“com.seek.server.handler.” + handlerName);
    8. Class<?>[] arg=new Class[]{SocketRequest.class,SocketResponse.class};
    9. Method method=c.getMethod(action,arg);
    10. method.invoke(c.newInstance(), new Object[]{request,response});
    11. } catch (Exception e) {
    12. e.printStackTrace();
    13. }
    14. }
    15. }
     

    SocketRequest和SocketResponse

                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             
    1. public class SocketRequest {
    2. private Session mSession;
    3. private String mReceive;
    4. public SocketRequest(Session session) {
    5. mSession = session;
    6. mReceive = session.getReceiveData();
    7. mSession.clear();
    8. }
    9. public String getValue(String key) {
    10. return JsonParser.get(mReceive, key);
    11. }
    12. public String getQueryString() {
    13. return mReceive;
    14. }
    15. }
     


    1. public class SocketResponse {
    2. private Session mSession;
    3. public SocketResponse(Session session) {
    4. mSession = session;
    5. }
    6. public void write(String msg) {
    7. mSession.setSendData(msg);
    8. }
    9. }
     

    最后則是兩個處理請求的Handler


    1. public class UserHandler {
    2. public void login(SocketRequest request,SocketResponse response) {
    3. System.out.println(request.getQueryString());
    4. //TODO: 處理用戶登錄
    5. response.write(“你肯定收到消息了”);
    6. }
    7. }
     


    1. public class MessageHandler {
    2. public void send(SocketRequest request,SocketResponse response) {
    3. System.out.println(request.getQueryString());
    4. //消息發送
    5. String key = request.getValue(“imei”);
    6. Session session = SessionManager.getSession(key);
    7. new SocketResponse(session).write(request.getValue(“sms”));
    8. }
    9. }
     

    還有個監測是否超時的類Looper,定期去刪除Session


    1. public class Looper extends Thread{
    2. private static Looper looper = new Looper();
    3. private static boolean isStart = false;
    4. private final int INTERVAL = 1000 * 60 * 5;
    5. private Looper(){}
    6. public static Looper getInstance() {
    7. return looper;
    8. }
    9. public void loop() {
    10. if(!isStart) {
    11. isStart = true;
    12. this.start();
    13. }
    14. }
    15. public void run() {
    16. Task task = new Task();
    17. while(true) {
    18. //Session過期檢測
    19. task.checkState();
    20. //心跳包檢測
    21. //task.sendAck();
    22. try {
    23. Thread.sleep(INTERVAL);
    24. } catch (InterruptedException e) {
    25. Log.e(e);
    26. }
    27. }
    28. }
    29. }
     


    1. public class Task {
    2. public void checkState() {
    3. Set<String> keys = SessionManager.getSessionKeys();
    4. if(keys.size() == 0) {
    5. return;
    6. }
    7. List<String> removes = new ArrayList<String>();
    8. Iterator<String> iterator = keys.iterator();
    9. String key = null;
    10. while(iterator.hasNext()) {
    11. key = iterator.next();
    12. if(!SessionManager.getSession(key).isKeekAlive()) {
    13. removes.add(key);
    14. }
    15. }
    16. if(removes.size() > 0) {
    17. Log.i(“sessions is time out,remove “ + removes.size() + “session”);
    18. }
    19. SessionManager.remove(removes.toArray(new String[removes.size()]));
    20. }
    21. public void sendAck() {
    22. Set<String> keys = SessionManager.getSessionKeys();
    23. if(keys.size() == 0) {
    24. return;
    25. }
    26. Iterator<String> iterator = keys.iterator();
    27. while(iterator.hasNext()) {
    28. iterator.next();
    29. //TODO 發送心跳包
    30. }
    31. }
    32. }
     

    注意,在Task和SessionProcessor類里都有對SessionManager的sessions做遍歷,文中使用的方法并不是很好,主要是效率問題,推薦使用遍歷Entry的方式來獲取Key和Value,因為一直在JavaWeb上折騰,所以會的童鞋看到Request和Response會挺親切,這個例子沒有經過任何安全和性能測試,如果需要放到生產環境上得話請先自行做測試-
    -!

    客戶端請求時的數據內容例如{handler:”UserHandler”,action:”login”,imei:”2364656512636″…….},這些約定就自己來定了。

    贊(0)
    分享到: 更多 (0)
    ?
    網站地圖   滬ICP備18035694號-2    滬公網安備31011702889846號
    gmnon.cn-疯狂蹂躏欧美一区二区精品,欧美精品久久久久a,高清在线视频日韩欧美,日韩免费av一区二区
    香蕉视频xxxx| 日本高清一区二区视频| 成人黄色av片| 国产欧美高清在线| 亚洲精品www.| 美女在线免费视频| 国产精品国产三级国产专区51| 欧美视频在线观看视频| 丝袜老师办公室里做好紧好爽 | 日韩一级免费在线观看| 欧美精品成人网| 一区中文字幕在线观看| 久久久久99精品成人片| 日韩欧美一区二| 在线观看免费视频高清游戏推荐| 爱爱爱视频网站| 777av视频| 成人性生交免费看| 国产毛片久久久久久国产毛片| 久久国产亚洲精品无码| xxww在线观看| 免费国产成人av| 国产a级黄色大片| 少妇一级淫免费播放| 成人一级生活片| 久久6免费视频| 青青草原成人网| 中文字幕制服丝袜在线| 2022亚洲天堂| 欧美视频在线第一页| 97公开免费视频| 无码专区aaaaaa免费视频| 毛片毛片毛片毛| 日本在线视频www| www.好吊操| 中文字幕黄色大片| 91亚洲免费视频| 久久国产乱子伦免费精品| 欧美中日韩在线| 日韩中文在线字幕| 亚洲36d大奶网| 少妇高潮喷水久久久久久久久久| 992tv成人免费观看| www.国产视频.com| 亚洲视频在线观看一区二区三区| 国产高清av在线播放| 大桥未久一区二区三区| aaa一级黄色片| 九九精品久久久| 欧美一级特黄a| 免费日韩中文字幕| ww国产内射精品后入国产| 人妻无码一区二区三区四区| 中文字幕第22页| 天天色天天综合网| 老司机久久精品| 中文字幕1234区| 超碰在线超碰在线| 自拍偷拍视频在线| 青青草原国产免费| 亚洲男人天堂2021| 中国 免费 av| 黄色a级片免费看| 国产美女主播在线播放| 日本在线xxx| 黄色国产精品视频| 99视频在线视频| 老司机久久精品| 91视频成人免费| 日韩黄色短视频| 白嫩少妇丰满一区二区| 精品999在线| 亚洲欧美日韩不卡| 黄色片免费在线观看视频| 国产精品videossex国产高清| 国产精品88久久久久久妇女| 国产aaa免费视频| 国产又大又黄又粗的视频| 911福利视频| www.好吊操| 激情视频免费网站| ijzzijzzij亚洲大全| 国产日韩av网站| www.精品在线| 男人的天堂avav| 国产成人精品无码播放| 黄色网址在线免费看| 国产av麻豆mag剧集| 蜜桃福利午夜精品一区| 精品国产av无码一区二区三区| 欧美激情国产精品日韩| 亚洲国产欧美91| 色欲色香天天天综合网www| 另类小说色综合| 久久久亚洲国产精品| 污污的网站18| 噜噜噜久久亚洲精品国产品麻豆| 久久久久久久久久久久久久久国产| 韩国无码av片在线观看网站| 久久国产乱子伦免费精品| 亚洲国产欧美91| 中文字幕在线观看第三页| 超碰人人爱人人| 精品综合久久久久| 蜜臀精品一区二区| 第一区免费在线观看| 2018日日夜夜| 女人床在线观看| 国内av一区二区| 天天干在线影院| www.com毛片| 免费在线观看污网站| 青青在线视频观看| 国产二级片在线观看| 精品一区二区三区毛片| 午夜免费福利视频在线观看| 日韩精品视频一区二区在线观看| 天天想你在线观看完整版电影免费| 女性隐私黄www网站视频| www成人免费| 中文字幕色呦呦| 亚洲一级片av| 日本xxxx黄色| 欧美三级午夜理伦三级富婆| 国产日韩一区二区在线| 无码av天堂一区二区三区| 欧美亚洲色图视频| 麻豆中文字幕在线观看| 国产传媒免费观看| 91亚洲精品久久久蜜桃借种| 在线观看的毛片| 四季av一区二区| 午夜久久久精品| 性生生活大片免费看视频| 免费看污黄网站| 日韩一级免费在线观看| 99视频在线免费| 亚洲第一中文av| 一级淫片在线观看| 永久免费在线看片视频| 日韩人妻精品一区二区三区| 国产av不卡一区二区| 99热这里只有精品7| 丁香色欲久久久久久综合网| 女人床在线观看| 黄页网站大全在线观看| 免费av手机在线观看| 国产福利视频在线播放| 亚洲欧美久久久久| 在线观看18视频网站| 黄色三级中文字幕| 国产网站免费在线观看| 成人精品视频一区二区| 美女黄色片视频| aaa一级黄色片| 欧美亚洲色图视频| 亚洲色精品三区二区一区| 波多野结衣天堂| 黄色一级片免费播放| 中国一级大黄大黄大色毛片| 亚洲精品久久久久久久蜜桃臀| 怡红院av亚洲一区二区三区h| 日韩在线第三页| 丁香色欲久久久久久综合网| 无码少妇一区二区三区芒果| 中文字幕 欧美日韩| 国产爆乳无码一区二区麻豆| 欧美一级黄色片视频| 欧美少妇一级片| 欧美性猛交久久久乱大交小说 | 午夜免费一区二区| 亚洲国产精品女人| 能在线观看的av| 99精品一级欧美片免费播放| 成年网站在线免费观看| 国产一级片自拍| 无码精品a∨在线观看中文| 中文字幕日韩综合| 免费在线激情视频| 国产欧美综合一区| 三年中国国语在线播放免费| 国产 欧美 日韩 一区| 五月婷婷六月合| av之家在线观看| 9色视频在线观看| 伊人五月天婷婷| 8x8x最新地址| 久草青青在线观看| 久久av高潮av| 欧美 国产 精品| 欧美精品色视频| 自拍偷拍 国产| 久久久久久香蕉| 黄页免费在线观看视频| 国产黑丝在线视频| 欧美男女交配视频| 欧美精品成人网| 男人的天堂日韩| 欧美一级黄色影院| 亚洲精品中文字幕无码蜜桃|