feat: 添加WebSSH功能并重构塔服务

- 添加WebSSH中间件,支持通过
This commit is contained in:
2026-01-09 00:43:06 +08:00
parent 971abd29ab
commit 54e0649313
6 changed files with 320 additions and 227 deletions

View File

@@ -1,66 +1,296 @@
package middleware
import (
"bufio"
"context"
"net"
"encoding/json"
"fmt"
"io"
"sync"
"time"
"github.com/gogf/gf/v2/os/glog"
"github.com/lxzan/gws"
"golang.org/x/crypto/ssh"
)
const PingInterval = 10 * time.Second
type Handler struct {
gws.BuiltinEventHandler
port int
target net.Conn
// SSHConfig SSH 连接配置
type SSHConfig struct {
ServerIP string `json:"server_ip"`
Port string `json:"port"`
Username string `json:"username"`
Password string `json:"password"`
}
// WebSSHMessage WebSSH 消息结构
type WebSSHMessage struct {
Type string `json:"type"` // "connect", "command", "resize", "disconnect"
Payload string `json:"payload"` // JSON encoded data
}
// TerminalSession 终端会话
type TerminalSession struct {
WebSocket *gws.Conn
SSHClient *ssh.Client
SSHSession *ssh.Session
Stdin io.WriteCloser
Stdout io.Reader
Mutex sync.Mutex
}
type Handler struct {
gws.BuiltinEventHandler
session *TerminalSession // 保存当前会话
}
// OnOpen 连接建立时的处理
func (c *Handler) OnOpen(socket *gws.Conn) {
// target, err := net.Dial("tcp", "127.0.0.1:"+gconv.String(c.port))
_ = socket.SetReadDeadline(time.Now().Add(60 * time.Second))
glog.Info(context.Background(), "WebSSH WebSocket 连接建立")
}
// if err != nil {
// glog.Debug(context.Background(), "连接失败")
// }
// c.target = target
// //errChan := make(chan error, 2)
// if c.target == nil {
// return
// }
// go func(conn net.Conn, socket *gws.Conn) {
// reader := bufio.NewReader(conn)
// LOOP:
// for {
// OnMessage 处理收到的消息
func (c *Handler) OnMessage(socket *gws.Conn, message *gws.Message) {
defer message.Close()
// // select {
// // default:
// packlen, err := reader.Peek(4)
// 解析消息
var webSSHMsg WebSSHMessage
err := json.Unmarshal(message.Data, &webSSHMsg)
if err != nil {
glog.Error(context.Background(), "解析 WebSSH 消息失败:", err)
c.sendError(socket, "无效的消息格式: "+err.Error())
return
}
// if err != nil {
// socket.WriteClose(1000, nil)
// break LOOP
// }
switch webSSHMsg.Type {
case "connect":
c.handleConnect(socket, webSSHMsg.Payload)
case "command":
c.handleCommand(socket, webSSHMsg.Payload)
case "resize":
c.handleResize(socket, webSSHMsg.Payload)
case "disconnect":
c.handleDisconnect(socket)
default:
glog.Warning(context.Background(), "未知的 WebSSH 消息类型:", webSSHMsg.Type)
}
}
// length := int32(binary.BigEndian.Uint32(packlen))
// handleConnect 处理连接请求
func (c *Handler) handleConnect(socket *gws.Conn, payload string) {
var config SSHConfig
err := json.Unmarshal([]byte(payload), &config)
if err != nil {
glog.Error(context.Background(), "解析 SSH 配置失败:", err)
c.sendError(socket, "无效的 SSH 配置: "+err.Error())
return
}
// data := make([]byte, length)
// io.ReadFull(reader, data)
// 设置默认端口
if config.Port == "" {
config.Port = "22"
}
// //pack_Ver := data[4] //因为包体已经解析所以这里直接取0
// // var pack = make([]byte, length)
// SSH 客户端配置
sshConfig := &ssh.ClientConfig{
User: config.Username,
Auth: []ssh.AuthMethod{
ssh.Password(config.Password),
},
HostKeyCallback: ssh.InsecureIgnoreHostKey(), // 生产环境应使用更安全的验证
Timeout: 30 * time.Second,
}
// socket.WriteMessage(gws.OpcodeBinary, data)
// //t.event.RecvHandler(pack)
// // client.OnReceiveBase(client, pack, length)
// 连接到 SSH 服务器
addr := fmt.Sprintf("%s:%s", config.ServerIP, config.Port)
sshClient, err := ssh.Dial("tcp", addr, sshConfig)
if err != nil {
glog.Error(context.Background(), "SSH 连接失败:", err)
c.sendError(socket, "SSH 连接失败: "+err.Error())
return
}
// }
// }(c.target, socket)
// //err = <-errChan
// if err != io.EOF {
// log.Println("proxy error:", err)
// 创建 SSH 会话
session, err := sshClient.NewSession()
if err != nil {
glog.Error(context.Background(), "创建 SSH 会话失败:", err)
sshClient.Close()
c.sendError(socket, "创建 SSH 会话失败: "+err.Error())
return
}
// }
// 设置终端
modes := ssh.TerminalModes{
ssh.ECHO: 1,
ssh.TTY_OP_ISPEED: 14400,
ssh.TTY_OP_OSPEED: 14400,
}
err = session.RequestPty("xterm", 80, 40, modes)
if err != nil {
glog.Error(context.Background(), "请求 PTY 失败:", err)
session.Close()
sshClient.Close()
c.sendError(socket, "请求 PTY 失败: "+err.Error())
return
}
// 获取标准输入输出
stdin, err := session.StdinPipe()
if err != nil {
glog.Error(context.Background(), "获取 SSH 输入管道失败:", err)
session.Close()
sshClient.Close()
c.sendError(socket, "获取 SSH 输入管道失败: "+err.Error())
return
}
stdout, err := session.StdoutPipe()
if err != nil {
glog.Error(context.Background(), "获取 SSH 输出管道失败:", err)
session.Close()
sshClient.Close()
c.sendError(socket, "获取 SSH 输出管道失败: "+err.Error())
return
}
// 启动会话
err = session.Shell()
if err != nil {
glog.Error(context.Background(), "启动 SSH Shell 失败:", err)
session.Close()
sshClient.Close()
c.sendError(socket, "启动 SSH Shell 失败: "+err.Error())
return
}
// 保存会话信息
c.session = &TerminalSession{
WebSocket: socket,
SSHClient: sshClient,
SSHSession: session,
Stdin: stdin,
Stdout: stdout,
}
// 启动输出转发协程
c.startOutputForwarding()
c.sendSuccess(socket, "SSH 连接成功")
}
// startOutputForwarding 启动输出转发协程
func (c *Handler) startOutputForwarding() {
if c.session == nil {
return
}
go func() {
scanner := bufio.NewScanner(c.session.Stdout)
for scanner.Scan() {
line := scanner.Text()
c.sendTerminalOutput(c.session.WebSocket, line+"\r\n")
}
if err := scanner.Err(); err != nil && c.session != nil {
glog.Error(context.Background(), "读取 SSH 输出失败:", err)
c.sendError(c.session.WebSocket, "读取 SSH 输出失败: "+err.Error())
}
}()
}
// handleCommand 处理命令输入
func (c *Handler) handleCommand(socket *gws.Conn, payload string) {
if c.session == nil || c.session.Stdin == nil {
c.sendError(socket, "SSH 会话未建立")
return
}
_, err := c.session.Stdin.Write([]byte(payload))
if err != nil {
glog.Error(context.Background(), "写入 SSH 命令失败:", err)
c.sendError(socket, "写入 SSH 命令失败: "+err.Error())
}
}
// handleResize 处理终端大小调整
func (c *Handler) handleResize(socket *gws.Conn, payload string) {
if c.session == nil || c.session.SSHSession == nil {
c.sendError(socket, "SSH 会话未建立")
return
}
// 解析终端大小
var size struct {
Width int `json:"width"`
Height int `json:"height"`
}
err := json.Unmarshal([]byte(payload), &size)
if err != nil {
c.sendError(socket, "无效的终端大小参数")
return
}
// 调整终端大小
err = c.session.SSHSession.WindowChange(size.Height, size.Width)
if err != nil {
glog.Error(context.Background(), "调整终端大小失败:", err)
c.sendError(socket, "调整终端大小失败: "+err.Error())
return
}
c.sendSuccess(socket, "终端大小已调整")
}
// handleDisconnect 处理断开连接
func (c *Handler) handleDisconnect(socket *gws.Conn) {
c.cleanup()
c.sendSuccess(socket, "已断开连接")
}
// cleanup 清理会话资源
func (c *Handler) cleanup() {
if c.session != nil {
if c.session.SSHSession != nil {
c.session.SSHSession.Close()
}
if c.session.SSHClient != nil {
c.session.SSHClient.Close()
}
c.session = nil
}
}
// sendTerminalOutput 发送终端输出到 WebSocket
func (c *Handler) sendTerminalOutput(ws *gws.Conn, output string) {
msg := WebSSHMessage{
Type: "output",
Payload: output,
}
data, _ := json.Marshal(msg)
ws.WriteMessage(gws.OpcodeText, data)
}
// sendSuccess 发送成功消息
func (c *Handler) sendSuccess(ws *gws.Conn, message string) {
msg := WebSSHMessage{
Type: "success",
Payload: message,
}
data, _ := json.Marshal(msg)
ws.WriteMessage(gws.OpcodeText, data)
}
// sendError 发送错误消息
func (c *Handler) sendError(ws *gws.Conn, message string) {
msg := WebSSHMessage{
Type: "error",
Payload: message,
}
data, _ := json.Marshal(msg)
ws.WriteMessage(gws.OpcodeText, data)
}
func (c *Handler) OnPing(socket *gws.Conn, payload []byte) {
@@ -70,61 +300,7 @@ func (c *Handler) OnPing(socket *gws.Conn, payload []byte) {
func (c *Handler) OnPong(socket *gws.Conn, payload []byte) {}
func (c *Handler) OnMessage(socket *gws.Conn, gwsmessage *gws.Message) {
if c.target != nil {
c.target.Write(gwsmessage.Bytes())
}
//fmt.Println(gwsmessage.Bytes())
}
func (c *Handler) OnClose(socket *gws.Conn, err error) {
glog.Debug(context.Background(), "断开连接")
if c.target != nil {
c.target.Close()
}
}
// RemoveSocket 移除WebSocket连接
func messeunpack(id int64, x []byte) {
// json := gjson.ParseBytes(x)
// json.ForEach(func(key, value gjson.Result) bool {
// fmt.Println(key, value)
// numeric := regexp.MustCompile(`\d`).MatchString(value.String())
// var ob int
// if numeric { //如果正确 则使用常量
// } else { //如果错误,则使用变量,即从数据库查询常量定义值是多少
// }
// work.User_script(uint(id), strings.Trim(message.Data.String(), " "))
// // switch s:=value.String() {
// // case regexp.MustCompile(`\d`).MatchString(s):
// // }
// return true
// })
// json.Get("1").String()
// var ob int
// numeric := regexp.MustCompile(`\d`).MatchString(json.Get("1").String())
// if numeric { //如果正确 则使用常量
// ob, _ = strconv.Atoi(json.Get("1").String())
// } else { //如果错误,则使用变量,即从数据库查询常量定义值是多少
// ob, _ = strconv.Atoi(json.Get("1").String())
// }
// switch ob {
// case 1: //发包函数
// work.User_script(uint(id), strings.Trim(message.Data.String(), " "))
// }
// Go 1.9.7 onwards only.
glog.Debug(context.Background(), "WebSSH 连接断开:", err)
c.cleanup()
}