远方的灯塔 - 专注于服务端技术分享 远方的灯塔 - 专注于服务端技术分享
首页
  • Java SE
  • Struts2
  • Hibernate
  • MyBatis
  • JAX-WS
  • 并发
  • 分布式
  • Git
  • 文章分类
  • 文章标签
  • 文章归档
  • 《C程序设计语言》
心情随笔
友情链接
给我留言 (opens new window)
关于我
GitHub (opens new window)

Terwer Green

一个后端老菜鸟
首页
  • Java SE
  • Struts2
  • Hibernate
  • MyBatis
  • JAX-WS
  • 并发
  • 分布式
  • Git
  • 文章分类
  • 文章标签
  • 文章归档
  • 《C程序设计语言》
心情随笔
友情链接
给我留言 (opens new window)
关于我
GitHub (opens new window)
  • JavaSE

  • 开源框架

  • Linux

  • Struts2

  • Hibernate

  • Webservice

  • 分布式

    • RPC架构设计及IO模型
      • socket
        • socket网络编程
        • socket概述
        • socket整体流程
        • 代码实现
        • IO模型
        • IO模型说明
        • 阻塞与非阻塞
        • 同步和异步
        • BIO(同步阻塞)
        • 工作机制
        • BIO的问题分析
        • NIO(同步非阻塞)
        • AIO(异步非阻塞)
        • BIO、NIO、AIO的适用场景分析
    • NIO编程及其三大核心原理
    • NIO三大核心之缓冲区(Buffer)
    • NIO三大核心之通道(Channel)
    • NIO三大核心之选择器(Selector)
    • Netty核心原理
    • 线程模型以及传统IO阻塞模型
    • Reactor模型
    • Netty线程模型
    • Netty核心API介绍
    • Netty入门与异步模型
    • Netty高级进阶之Netty编解码器
    • Netty高级进阶之基于Netty的群聊天室案例
    • Netty高级进阶之基于Netty的HTTP服务器开发
    • Netty高级进阶之基于Netty的Websocket开发网页聊天室
    • Netty高级进阶之Netty中的粘包和拆包的解决方案
    • Nety源码剖析
    • 自定义RPC框架之分布式架构网络通信理论
    • 自定义RPC框架之基于Netty实现RPC框架
    • 分布式架构理论
    • 分布式理论之数据一致性
    • 分布式理论之CAP定理
    • 分布式理论之BASE定理
    • 分布式一致性协议之两阶段提交协议(2PC)
    • 分布式一致性协议之三阶段提交协议(3PC)
    • 分布式一致性协议之NWR协议
    • 分布式一致性协议之Gossip协议
    • 分布式一致性协议之Paxos协议
    • 分布式一致性协议之Raft协议
    • 分布式一致性协议之Lease机制
    • 分布式系统设计策略之心跳检测
    • 分布式系统设计策略之高可用
    • 分布式系统设计策略之容错性
    • 分布式系统设计策略之负载均衡
    • 分布式架构服务调用
    • 分布式服务治理之服务协调
    • 分布式服务治理之服务削峰
    • 分布式服务治理之服务降级
    • 分布式服务治理之服务限流
    • 分布式服务治理之服务熔断
    • 分布式服务治理之服务链路追踪
    • 架构设计基本原则之开闭原则(OCP)
    • 架构设计基本原则之单一职责原则(SRP)
    • 架构设计基本原则之接口隔离原则(ISP)
    • 架构设计基本原则之里式替换原则(LSP)
    • 架构设计基本原则之依赖倒置原则(DIP)
    • 架构设计基本原则知识扩展
    • 分布式架构知识拓展与总结
  • 分布式框架

  • 后端开发
  • 分布式
terwer
2022-04-16
目录

RPC架构设计及IO模型

本文阐述了socket编程、IO网络模型,以及各种IO模型的适用场景。

# RPC架构设计

文章更新历史

2022/05/10 feat:新增GO、C++、C支持

2022/05/10 feat:新增PHP支持

2022/05/09 feat:新增Scala、Grovvy、GraalVMJS支持

2022/05/09 feat:新增C#、F#支持

2022/05/09 feat:新增JRuby支持

2022/05/09 feat:新增Kotlin支持、Jython支持

2022/05/04 feat:修改相关描述。

2022/03/01 feat:初稿。

# socket

# socket网络编程

# socket概述

socket套接字是两台主机之间逻辑连接的端点。

TCP/IP协议是传输层协议,主要解决数据在网络中的传输

socket是网络通信之间的抽象接口,它包含网络通信的五种基础信息:连接使用的协议、本地主机的ip地址、本地进程协议端口、远程主机ip地址、远程进程的协议端口。

# socket整体流程

socket编程主要包括客户端和服务端两个方面。

首先在服务端创建一个服务端套接字(ServerSocket),并把它附加到一个端口上,服务端从这个端口监听链接。

端口范围是 0-65536 ,注意 0-1024 是特权服务保留的端口。可选择任意一个不被其他进程使用的端口。

客户端请求与服务端连接时,根据服务端的域名或者ip地址,加上端口号,打开一个套接字。当服务器接受连接后,服务器和客户端之间的操作可以像输入输出流一样操作。

image-20220417134215803

# 代码实现

  1. 服务端代码

      /**
       * 服务端
       *
       * @name: ServerDemo
       * @author: terwer
       * @date: 2022-05-08 14:20
       */
      object ServerDemo {
          @Throws(IOException::class)
          @JvmStatic
          fun main(args: Array<String>) {
              // 1.创建一个线程池,如果有客户端链接就创建一个线程与之通信
              val executorService = Executors.newCachedThreadPool()
              // 2.创建ServerSocket
              val serverSocket = ServerSocket(9999)
              println("服务器已启动")
              while (true) {
                  // 3.监听客户端
                  val socket = serverSocket.accept()
                  println("有客户端链接")
                  executorService.execute { handle(socket) }
              }
          }
      
          private fun handle(socket: Socket) {
              try {
                  println("线程ID:" + Thread.currentThread().id + ",线程名称:" + Thread.currentThread().name)
                  // 从连接中取出输入流
                  val inputStream = socket.getInputStream()
                  val b = ByteArray(1024)
                  val read = inputStream.read(b)
                  println("客户端发过来的消息:" + String(b, 0, read))
      
                  // 链接中取出输出流并回话
                  val outputStream = socket.getOutputStream()
                  outputStream.write("没有".toByteArray())
              } catch (e: Exception) {
                  e.printStackTrace()
              } finally {
                  try {
                      socket.close()
                  } catch (e: IOException) {
                      e.printStackTrace()
                  }
              }
          }
      }
      
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      /**
       * 服务端
       *
       * @name: ServerDemo
       * @author: terwer
       * @date: 2022-04-17 14:20
       **/
      public class ServerDemo {
          public static void main(String[] args) throws IOException {
              // 1.创建一个线程池,如果有客户端链接就创建一个线程与之通信
              ExecutorService executorService = Executors.newCachedThreadPool();
              // 2.创建ServerSocket
              ServerSocket serverSocket = new ServerSocket(9999);
              System.out.println("服务器已启动");
              while (true) {
                  // 3.监听客户端
                  Socket socket = serverSocket.accept();
                  System.out.println("有客户端链接");
                  executorService.execute(new Runnable() {
                      @Override
                      public void run() {
                          handle(socket);
                      }
                  });
              }
          }
      
          private static void handle(Socket socket) {
              try {
                  System.out.println("线程ID:" + Thread.currentThread().getId() + ",线程名称:" + Thread.currentThread().getName());
                  // 从连接中取出输入流
                  InputStream inputStream = socket.getInputStream();
                  byte[] b = new byte[1024];
                  int read = inputStream.read(b);
                  System.out.println("客户端" + new String(b, 0, read));
      
                  // 链接中取出输出流并回话
                  OutputStream outputStream = socket.getOutputStream();
                  outputStream.write("没有".getBytes());
              } catch (Exception e) {
                  e.printStackTrace();
              } finally {
                  try {
                      socket.close();
                  } catch (IOException e) {
                      e.printStackTrace();
                  }
              }
          }
      }
      
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      # -*- coding: utf-8 -*-
      
      from java.util.concurrent import Executors
      from java.net import ServerSocket
      from java.lang import String
      
      import jarray
      import sys
      
      # Jython版socket服务端
      #
      # @name: ServerDemo
      # @author: terwer
      # @date: 2022-05-09 15:48
      class ServerDemo(object):
      
          @classmethod
          def main(self, args):
              executorService = Executors.newCachedThreadPool()
              serverSocket = ServerSocket(9999)
              print("server is running")
              while True:
                  socket = serverSocket.accept()
                  # lambda <<argument(s)>> : <<function body>>
                  # name_combo = lambda first, last: first + ' ' + last
                  # name_combo('Jim','Baker')
                  runnable_lambda_fun = lambda: self.handle(socket)
                  executorService.execute(runnable_lambda_fun)
      
          @classmethod
          def handle(self, socket):
              try:
                  print("receiced socket here=>")
                  print(socket)
      
                  inputStream = socket.getInputStream()
                  b = jarray.zeros(1024, "b")
                  read = inputStream.read(b)
                  in_str = String(b, 0, read)
                  print("receice msg from client=>")
                  print(in_str)
      
                  outputStream = socket.getOutputStream()
                  out_bytes = map(ord, "msg from jython socket server")
                  outputStream.write(out_bytes)
              except Exception as e:
                  print("An error occured:")
                  print(e)
              finally:
                  try:
                      socket.close()
                  except IOError as e:
                      print("An error occured:")
                      print(e)
      
      if __name__ == "__main__":
          ServerDemo.main(sys.argv)
      
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      require 'java'
      
      java_import 'java.util.concurrent.Executors'
      java_import 'java.net.ServerSocket'
      java_import 'java.net.SocketInputStream'
      java_import 'java.lang.Byte'
      
      # JRuby版socket服务端
      #
      # https://github.com/jruby/jruby/wiki/JRuby-Reference#Arrays
      #
      # @name: ServerDemo
      # @author: terwer
      # @date: 2022-05-09 21:04
      class ServerDemo
        def self.main()
          executorService = Executors.newCachedThreadPool()
          serverSocket = ServerSocket.new(9999)
          puts("jbury server is running...")
          while (true)
            socket = serverSocket.accept()
            runnable_lambda_fun = -> { self.handle(socket) }
            executorService.execute(runnable_lambda_fun)
          end
        end
      
        def self.handle(socket)
          begin
            print("receiced socket here=>")
            puts(socket)
            inputStream = socket.getInputStream()
            b = Java::byte[1024].new
            read = inputStream.read(b)
            puts "received msg from client=> #{b}"
      
            outputStream = socket.getOutputStream()
            out_bytes = "server msg send from jruby".to_java_bytes
            outputStream.write(out_bytes)
          rescue => e
            socket.close()
            puts "Exception Occurred #{e.class}. Message: #{e.message}. Backtrace:  \n #{e.backtrace.join("\n")}"
            Rails.logger.error "Exception Occurred #{e.class}. Message: #{e.message}. Backtrace:  \n #{e.backtrace.join("\n")}"
          end
        end
      end
      
      ServerDemo.main()
      
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      /**
       * 服务端
       *
       * @name: ServerDemo
       * @author: terwer
       * @date: 2022-05-09 14:20
       * */
      object ServerDemo {
        @throws[IOException]
        def main(args: Array[String]) = { // 1.创建一个线程池,如果有客户端链接就创建一个线程与之通信
          val executorService = Executors.newCachedThreadPool
          // 2.创建ServerSocket
          val serverSocket = new ServerSocket(9999)
          System.out.println("服务器已启动")
          while ( {
            true
          }) { // 3.监听客户端
            val socket = serverSocket.accept
            System.out.println("有客户端链接")
            executorService.execute(() => handle(socket))
          }
        }
      
        private def handle(socket: Socket) = try {
          System.out.println("线程ID:" + Thread.currentThread.getId + ",线程名称:" + Thread.currentThread.getName)
          // 从连接中取出输入流
          val inputStream = socket.getInputStream
          val b = new Array[Byte](1024)
          val read = inputStream.read(b)
          System.out.println("客户端发过来的消息:" + new String(b, 0, read))
          // 链接中取出输出流并回话
          val outputStream = socket.getOutputStream
          outputStream.write("没有".getBytes)
        } catch {
          case e: Exception =>
            e.printStackTrace()
        } finally try socket.close()
        catch {
          case e: IOException =>
            e.printStackTrace()
        }
      }
      
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      /**
       * 服务端
       *
       * @name: ServerDemo
       * @author: terwer
       * @date: 2022-05-09 14:20
       * */
      class ServerDemo {
          static void main(String[] args) throws IOException {
              // 1.创建一个线程池,如果有客户端链接就创建一个线程与之通信
              ExecutorService executorService = Executors.newCachedThreadPool()
              // 2.创建ServerSocket
              ServerSocket serverSocket = new ServerSocket(9999)
              System.out.println('服务器已启动')
              while (true) {
                  // 3.监听客户端
                  Socket socket = serverSocket.accept()
                  System.out.println('有客户端链接')
                  executorService.execute(() -> handle(socket))
              }
          }
      
          private static void handle(Socket socket) {
              try {
                  System.out.println('线程ID:' + Thread.currentThread().id + ',线程名称:' + Thread.currentThread().name)
                  // 从连接中取出输入流
                  InputStream inputStream = socket.inputStream
                  byte[] b = new byte[1024]
                  int read = inputStream.read(b)
                  System.out.println('客户端发过来的消息:' + new String(b, 0, read))
      
                  // 链接中取出输出流并回话
                  OutputStream outputStream = socket.outputStream
                  outputStream.write('没有'.bytes)
              } catch (Exception e) {
                  e.printStackTrace()
              } finally {
                  try {
                      socket.close()
                  } catch (IOException e) {
                      e.printStackTrace()
                  }
              }
          }
      }
      
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      <?php
      
      namespace com\terwergreen\php;
      
      /**
       * 服务端
       *
       * @name: ServerDemo
       * @author: terwer
       * @date: 2022-05-10 14:20
       **/
      class ServerDemo {
          public static function main($args) {
              $addr = "127.0.0.1";
              $port = 9999;
      
              // 创建server_socket
              $server_socket = socket_create(AF_INET, SOCK_STREAM, 0);
              // 绑定端口
              socket_bind($server_socket, $addr, $port);
              // 监听
              socket_listen($server_socket);
              echo "php版socket服务器启动成功\n";
      
              while (true) {
                  $socket = socket_accept($server_socket);
      
                  self::handle($socket);
              }
      
          }
      
          public static function handle($socket) {
              echo "处理客户端连接\n";
      
              $msg = socket_read($socket, 1024, PHP_BINARY_READ);
              echo "来自客户端的消息:$msg";
      
              $out_msg = "这是php版socket服务器发送过来的消息";
              socket_write($socket, $out_msg);
      
              socket_close($socket);
          }
      }
      
      $args = [];
      ServerDemo::main($args);
      
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      package main
      
      import (
      	"bytes"
      	"fmt"
      	"net"
      )
      
      /*
      * 服务端
      *
      * @name: ServerDemo
      * @author: terwer
      * @date: 2022-05-09 23:14
      **/
      func main() {
      	listen, _ := net.Listen("tcp", ":9999")
      	fmt.Println("go的socket服务器已启动")
      
      	for true {
      		conn, _ := listen.Accept()
      		handle(conn)
      	}
      }
      
      func handle(conn net.Conn) {
      	fmt.Println("处理客户端连接")
      
      	// 收消息
      	b := make([]byte, 1024)
      	read, err := conn.Read(b)
      	if err != nil {
      		return
      	}
      	var dataBuffer bytes.Buffer
      	dataBuffer.Write(b[:read])
      	fmt.Println("接收到来自客户端的消息:")
      	fmt.Print(dataBuffer.String())
      
      	// 发消息
      	var str string = "这是来着go的socket服务端发过来的消息"
      	var outBytes []byte = []byte(str)
      	conn.Write(outBytes)
      
      	// 关闭链接
      	conn.Close()
      }
      
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      #include <iostream>
      #include <sys/socket.h>
      #include <netinet/in.h>
      #include <stdio.h>
      #include <stdlib.h>
      #include <unistd.h>
      #include <string.h>
      #include <time.h>
      
      int main() {
          int serv_sock = 0, sock = 0;
          struct sockaddr_in serv_addr;
      
          char sendBuff[2015];
          time_t ticks;
      
          // 创建socket
          serv_sock = socket(AF_INET, SOCK_STREAM, 0);
          if (serv_sock < 0) {
              std::cout << "创建socket失败";
              exit(1);
          }
          memset(&serv_addr, '0', sizeof(serv_addr));
          memset(sendBuff, '0', sizeof(sendBuff));
      
          serv_addr.sin_family = AF_INET;
          serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);// htonl(inet_addr("127.0.0.1"));// 两种效一样
          serv_addr.sin_port = htons(9999);
      
          // 绑定ip与端口
          bind(serv_sock, (struct sockaddr *) &serv_addr, sizeof(serv_addr));
      
          // 开启监听
          listen(serv_sock, 10);
      
          std::cout << "C++版socket服务器已启动!" << std::endl;
      
          while (true) {
              try {
                  // 接收消息
                  sock = accept(serv_sock, (sockaddr *) NULL, NULL);
      
                  // 发送消息
                  snprintf(sendBuff, sizeof(sendBuff), "这是从C++版的socket服务器发送过来的消息:%.24s\r\n", ctime(&ticks));
                  write(sock, sendBuff, strlen(sendBuff));
      
                  // 关闭
                  // close(server_socket);
                  close(sock);
      
                  sleep(1);
              } catch (...) {
                  std::cout << "服务端异常";
                  break;
              }
          }
      }
      
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      #include <stdio.h>
      #include <sys/types.h>
      #include <sys/socket.h>
      #include <stdlib.h>
      #include <string.h>
      #include <netinet/in.h>
      #include <stdbool.h>
      #include <time.h>
      #include <unistd.h>
      
      int main() {
          int serv_sock = 0, sock = 0;
          struct sockaddr_in serv_addr;
      
          char sendBuff[2015];
          time_t ticks;
      
          // 创建socket
          serv_sock = socket(AF_INET, SOCK_STREAM, 0);
          if (serv_sock < 0) {
              exit(1);
          }
          memset(&serv_addr, '0', sizeof(serv_addr));
          memset(sendBuff, '0', sizeof(sendBuff));
      
          serv_addr.sin_family = AF_INET;
          // serv_addr.sin_addr.s_addr = INADDR_ANY;
          serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
          // serv_addr.sin_port = 9999;
          serv_addr.sin_port = htons(9999);
      
          // 绑定ip与端口
          bind(serv_sock, (struct sockaddr *) &serv_addr, sizeof(serv_addr));
      
          // 开启监听
          listen(serv_sock, 10);
          printf("C语言版的socket服务器启动成功\n");
      
          while (true) {
              // 接收消息
              sock = accept(serv_sock, (struct sockaddr *) NULL, NULL);
      
              // 发送消息
              snprintf(sendBuff, sizeof(sendBuff), "这是从C语言版的socket服务器发送过来的消息:%.24s\r\n", ctime(&ticks));
              write(sock, sendBuff, strlen(sendBuff));
      
              // 关闭
              // close(server_socket);
              close(sock);
      
              sleep(1);
          }
      }
      
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      using System.Net;
      using System.Net.Sockets;
      using System.Text;
      
      namespace SocketDemo;
      
      /**
       * https://docs.microsoft.com/en-us/dotnet/framework/network-programming/asynchronous-server-socket-example
       */
      public class ServerDemo{
          public static void Main(string[] args){
              var serverSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
              serverSocket.Bind(new IPEndPoint(IpToInt("127.0.0.1"), 9999));
              serverSocket.Listen();
              Console.WriteLine("C#版Socket服务端已启动");
              while (true)
              {
                  Socket socket = serverSocket.Accept();
                  ThreadPool.QueueUserWorkItem((state => { DoHandle(socket); }));
              }
          }
      
          private static void DoHandle(Socket socket){
              Console.WriteLine("处理客户端请求");
              Console.WriteLine(socket);
      
              byte[] in_bytes = new byte[1024];
              int len = socket.Receive(in_bytes);
              var in_str = Encoding.UTF8.GetString(in_bytes, 0, len);
              Console.WriteLine("接收到来着客户端的数据=>" + in_str);
      
              byte[] out_bytes = Encoding.UTF8.GetBytes("从C#服务端发送给客户端的数据");
              socket.Send(out_bytes);
          }
      
          private static uint IpToInt(string addr){
              return BitConverter.ToUInt32(IPAddress.Parse(addr).GetAddressBytes(), 0);
          }
      }
      
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      namespace global
      
      open System.Net
      open System.Net.Sockets
      open System.Text
      namespace SocketDemo
      
      open System
      open System.Threading
      
      type ServerDemo() =
          static member Main(args: string []) =
              let mutable serverSocket =
                  new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp)
      
              let mutable addr = IPAddress.Parse("127.0.0.1")
              let mutable endPoint = new IPEndPoint(addr, 9999)
              serverSocket.Bind(endPoint)
              serverSocket.Listen()
              Console.WriteLine("F#版Socket服务端已启动")
      
              while true do
                  let mutable (socket: Socket) = serverSocket.Accept()
      
                  ThreadPool.QueueUserWorkItem(fun state -> ServerDemo.DoHandle(socket))
                  |> ignore
      
              ()
      
          static member private DoHandle(socket: Socket) =
              Console.WriteLine("处理客户端请求")
              Console.WriteLine(socket)
              let mutable (in_bytes: byte []) = Array.zeroCreate 1024
              let mutable (len: int) = socket.Receive(in_bytes)
      
              let mutable in_str =
                  Encoding.UTF8.GetString(in_bytes, 0, len)
      
              Console.WriteLine("接收到来着客户端的数据=>" + in_str)
      
              let mutable (out_bytes: byte []) =
                  Encoding.UTF8.GetBytes("从F#服务端发送给客户端的数据")
      
              socket.Send(out_bytes) |> ignore
      
      module ServerDemo__run =
          [<EntryPoint>]
          let main args =
              ServerDemo.Main(args)
              0
      
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      // https://github.com/oracle/graaljs/blob/master/docs/user/FAQ.md
      
      const Executors = Java.type("java.util.concurrent.Executors")
      const ServerSocket = Java.type("java.net.ServerSocket")
      const Byte = Java.type("java.lang.Byte")
      const String = Java.type("java.lang.String")
      
      const serverSocket = new ServerSocket(9999)
      console.log("Node.js版socket服务器已启动.")
      
      while (true) {
          const socket = serverSocket.accept()
          handle(socket)
      }
      
      function handle(socket) {
          try {
              console.log("处理客户端链接")
      
              const inputStream = socket.getInputStream()
              var b = java.lang.reflect.Array.newInstance(java.lang.Byte.TYPE, 1024)
              const read = inputStream.read(b)
              const in_bytes = new String(b, 0, read)
              console.log("接收到来着客户端的消息", in_bytes)
      
              let outputSream = socket.getOutputStream()
              // TODO:中文乱码,NodeJS的Bug,https://juejin.cn/post/6981797254705184798
              const out_bytes = Buffer.from("msg from graaljs socket server", "utf8")
              outputSream.write(out_bytes)
          } catch (e) {
              console.log("系统异常:", e)
          } finally {
              if (null != socket) {
                  socket.close()
              }
          }
      }
      
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      (ns com.terwergreen.clojure.ServerDemo
        (:import java.util.concurrent.Executors)
        (:import java.net.ServerSocket)
        )
      
      ;定义main函数
      (defn -main
        "main"
        []
        ;创建线程池
        (def executorService (Executors/newCachedThreadPool))
        ;(println executorService)
      
        ;socket通信
        (def serverSocket (new ServerSocket 9999))
        (println serverSocket)
      
        ;启动服务器
        (println "clojore socker server is runnning...")
        ;(.println (System/out) "clojore socker server is runnning...")
      
        (while true
          (def socket (.accept serverSocket))
      
          ;Runnable线程
          (def run_fun (fn [socket] (println "message")))
          ;放入线程池
          ;(def call_run_fun (run_fun socket))
          ;TODO:lambda传参有问题
          (.execute executorService call_run_fun)
      
          )
      
        )
      
      ;(defn handle [socket] (print "Hello"))
      
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      // Make sure to add code blocks to your code group
    • 客户端代码

      /**
       * 客户端
       *
       * @name: ClientDemo
       * @author: terwer
       * @date: 2022-04-17 15:30
       **/
      public class ClientDemo {
          public static void main(String[] args) throws IOException {
              while (true) {
                  // 1.创建客户端socket
                  Socket s = new Socket("127.0.0.1", 9999);
                  // 2.从连接中获取输出流并发送消息
                  OutputStream os = s.getOutputStream();
                  System.out.println("请输入:");
                  Scanner sc = new Scanner(System.in);
                  String msg = sc.nextLine();
                  os.write(msg.getBytes());
      
                  // 3.从连接中取出输入流并接受会话
                  InputStream is = s.getInputStream();
                  byte[] b = new byte[1024];
                  // 下面写法错了
                  // int read = is.read();
                  // 应该是
                  int read = is.read(b);
                  System.out.println("老板说:" + new String(b, 0, read).trim());
      
                  s.close();
              }
          }
      }
      
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32

    参考:

    https://www.thegeekstuff.com/2011/12/c-socket-programming/ (opens new window)

    # IO模型

    # IO模型说明

    1. 简单理解:用什么样的通道进行数据的发送和接收。在很大程度上决定了程序通信的性能。

    2. Java支持三种网络编程模型I/O模式:BIO(同步阻塞)、NIO(同步非阻塞)、AIO(异步非阻塞)

      # 阻塞与非阻塞

      指的是网络IO的线程是否处于阻塞或者等待状态

      线程访问资源,该资源是否准备就绪的一种处理方式

      image-20220417214737992

      # 同步和异步

      指的是数据的请求方式,同步和异步是请求数据的一种方式。

      image-20220417215010935

    # BIO(同步阻塞)

    Java BIO就是传统的socket编程

    BIO(blocking IO):同步阻塞,服务器实现方式为一个链接一个线程,即客户端有一个链接时,服务端就要启动一个线程进行处理,如果这个链接不作任何事情,会造成不必要的线程开销,可以通过线程池改善,实现多个客户端链接服务器。

    # 工作机制

    image-20220417230159590

    生活中的例子:

    image-20220417230242089

    # BIO的问题分析
    1. 每个请求都要创建独立的线程,与对应的客户端进行read,业务处理,数据write
    2. 并发量大的时候,需要创建大量的线程来处理链接,系统资源占用较大
    3. 链接建立后,如果当前线程没有数据可读,线程就阻塞在read上,造成线程资源浪费

    # NIO(同步非阻塞)

    同步非阻塞,服务器实现模式为一个线程处理多个请求(链接),客户端发送的链接请求会注册到多路复用器上,多路复用器轮训到链接有IO请求就进行处理。

    image-20220418134554952

    生活中的例子:

    image-20220418134644973

    # AIO(异步非阻塞)

    AIO引入了异步通道的概念,采用了Proactor模式,简化了程序编写,有效的请求才启动线程。

    他的特点是先由操作系统完成后才通知服务端启动线程去处理,一般适用于连接数较多且连接时间较长的应用。

    Proactor模式是一个消息异步通知的模式,Proactor通知的不是就绪事件,而是操作完成事件,这也是操作系统异步IO的主要模型。

    https://www.zhihu.com/question/26943938 (opens new window)

    生活中的例子:

    image-20220418135022363

    # BIO、NIO、AIO的适用场景分析

    1. BIO(同步阻塞模式)适用于连接数比较小,且固定的架构。对服务器资源的要求比较高,并发局限于应用中,jdk1.4以前的唯一选择,代码容易理解。
    2. NIO(同步非阻塞模式)适用于链接数目多且链接比较短(轻操作)的架构,比如聊天服务器、弹幕系统、服务器之间的通讯等。编程比较复杂,jdk1.4开始支持。
    3. AIO(异步非阻塞模式)适用于连接数目比较多并且连接时间比较长(重操作)的架构,比如相册服务器,充分调用OS参与并发操作。编程比较复杂,jdk1.7开始支持。
    编辑 (opens new window)
    #rpc#arch#io
    上次更新: 2023/02/22, 13:47:25
    Apache-CXF简介与第一个JAX-WS的入门程序
    NIO编程及其三大核心原理

    ← Apache-CXF简介与第一个JAX-WS的入门程序 NIO编程及其三大核心原理→

    最近更新
    01
    解决css部分border被圆角切掉之后圆角的边框消失问题
    03-18
    02
    使用TypeScript开发一个自定义的Node-js前端开发脚手架
    03-08
    03
    Github-Actions使用release-please实现自动发版
    03-06
    更多文章>
    Theme by Vdoing | Copyright © 2011-2023 Terwer Green | MIT License | 粤ICP备2022020721号-1 | 百度统计
    • 跟随系统
    • 浅色模式
    • 深色模式
    • 阅读模式