Files
bl/modules/base/middleware/server.go

632 lines
18 KiB
Go
Raw Normal View History

package middleware
import (
"blazing/modules/config/model"
config "blazing/modules/config/service"
"bufio"
"context"
"crypto/rand"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"net"
"strings"
"sync/atomic"
"time"
"github.com/gogf/gf/v2/os/glog"
"github.com/gogf/gf/v2/util/grand"
"github.com/lxzan/gws"
"golang.org/x/crypto/ssh"
)
const (
PingInterval = 10 * time.Second
defaultWorkDir = "$HOME" // 全环境兼容
randomStrLength = 4 // 缩短随机串长度
cmdTimeout = 120 * time.Second // 延长超时适配Screen安装+下载)
)
// SSHConfig SSH连接配置
type SSHConfig struct {
ServerIP string `json:"server_ip"`
Port string `json:"port"`
Username string `json:"username"`
Password string `json:"password"`
}
// WebSSHMessage 消息结构
type WebSSHMessage struct {
Type string `json:"type"`
Payload string `json:"payload"`
}
// TerminalSession 会话结构体(优化通道)
type TerminalSession struct {
WebSocket *gws.Conn
SSHClient *ssh.Client
SSHSession *ssh.Session
Stdin io.WriteCloser
Stdout io.Reader
Ready bool
outputBuf chan string // 增大通道缓存
closed atomic.Bool
}
type ServerHandler struct {
gws.BuiltinEventHandler
session *TerminalSession
model.ServerList
isinstall uint32
target net.Conn
}
// 生成随机文件名
func (s *ServerHandler) generateRandomFileName() string {
randBytes := make([]byte, randomStrLength)
_, err := rand.Read(randBytes)
if err != nil {
return fmt.Sprintf("logic_%d", time.Now().UnixNano())
}
randStr := hex.EncodeToString(randBytes)
timestamp := time.Now().Format("20060102150405")
return fmt.Sprintf("logic_%s_%s", timestamp, randStr)
}
// 执行脚本命令
func (s *ServerHandler) executeScript(scriptContent, scriptName string) (string, error) {
if s.session == nil || s.session.closed.Load() || !s.session.Ready {
return "", fmt.Errorf("session not ready")
}
// 生成临时脚本路径
scriptPath := fmt.Sprintf("/tmp/%s.sh", scriptName)
// 直接将脚本内容写入文件
writeScriptCmd := fmt.Sprintf("cat > '%s' << 'DEPLOYMENT_SCRIPT_END'\n%s\nDEPLOYMENT_SCRIPT_END\n", scriptPath, scriptContent)
_, err := s.session.Stdin.Write([]byte(writeScriptCmd))
if err != nil {
return "", err
}
// 等待一会儿确保脚本写入完成
time.Sleep(time.Second)
// 设置执行权限
_, err = s.session.Stdin.Write([]byte(fmt.Sprintf("chmod +x %s\n", scriptPath)))
if err != nil {
return "", err
}
// 等待权限设置完成
time.Sleep(time.Second)
// 执行脚本并将输出发送到WebSocket在命令中直接删除脚本文件
executeCmd := fmt.Sprintf("bash %s 2>&1; rm -f %s\n", scriptPath, scriptPath)
_, err = s.session.Stdin.Write([]byte(executeCmd))
if err != nil {
return "", err
}
// 读取脚本执行输出
output := ""
done := make(chan bool)
go func() {
defer func() {
if r := recover(); r != nil {
glog.Error(context.Background(), "Script execution goroutine panic:", r)
}
}()
scanner := bufio.NewScanner(s.session.Stdout)
for scanner.Scan() {
line := scanner.Text()
// 检测到脚本执行完成标记则退出
if strings.Contains(line, "#SCRIPT_EXECUTION_COMPLETE#") {
break
}
// 忽略一些可能导致连接断开的输出
if strings.Contains(line, "logout") || strings.Contains(line, "exit") {
continue
}
output += line + "\n"
s.sendTerminalOutput(s.session.WebSocket, line)
}
done <- true
}()
// 等待脚本执行完成或超时
select {
case <-done:
return strings.TrimSpace(output), nil
case <-time.After(cmdTimeout):
return strings.TrimSpace(output), nil
}
}
func (s *ServerHandler) executeFullDeployment() error {
s.sendTerminalOutput(s.session.WebSocket, "开始执行完整自动化部署流程...")
// 1. 获取并校验下载链接
2026-01-23 20:34:52 +00:00
filename := config.NewServerService().GetFile()
fileURL := "http://sun.72wo.cn/" + filename
fileURL = strings.TrimSpace(fileURL)
if fileURL == "" {
return fmt.Errorf("下载链接为空")
}
// 前置校验确保链接是合法的URL包含http/https
if !strings.HasPrefix(fileURL, "http://") && !strings.HasPrefix(fileURL, "https://") {
return fmt.Errorf("下载链接格式非法缺少http/https协议%s", fileURL)
}
s.sendTerminalOutput(s.session.WebSocket, fmt.Sprintf("【前置检查】有效下载链接:%s", fileURL))
// 2. 生成目标文件路径
randomFileName := s.generateRandomFileName()
remoteExePath := fmt.Sprintf("%s/%s", defaultWorkDir, randomFileName)
remoteWorkDir := defaultWorkDir
onlineID := fmt.Sprintf("%d", s.ServerList.OnlineID)
fixedScreenSession := "logic"
// 3. 定义部署脚本(给每个%s加唯一标记方便核对
deploymentScriptTpl := `
set -e
set -x
# ===== 检查并安装screen =====
echo "检查Screen是否已安装..."
if command -v screen &> /dev/null; then
echo "=== Screen已安装跳过 ==="
else
echo "=== Screen未安装开始安装 ==="
if command -v apt &> /dev/null; then
apt update -y && apt install -y screen
elif command -v yum &> /dev/null; then
yum install -y screen
elif command -v dnf &> /dev/null; then
dnf install -y screen
elif command -v pacman &> /dev/null; then
pacman -S --noconfirm screen
else
echo "❌ 不支持的包管理器无法安装Screen"
exit 1
fi
command -v screen || { echo "❌ Screen安装失败"; exit 1; }
fi
2026-01-23 14:59:15 +00:00
#!/bin/bash
2026-01-23 21:53:54 +00:00
# ===== 优雅终止logic会话解决screen -ls卡住问题=====
echo "===== 优雅终止logic会话 ====="
2026-01-23 14:59:15 +00:00
# 替换为你实际的screen名称示例logic
2026-01-23 21:53:54 +00:00
SCREEN_NAME="logic"
2026-01-23 14:59:15 +00:00
LOG_FILE="./screen_logic_exit.log"
# 调试开关如需详细日志取消set -x注释
set -o pipefail
export PS4='[DEBUG] ${BASH_SOURCE}:${LINENO} - ${FUNCNAME[0]:+${FUNCNAME[0]}(): }'
# set -x
2026-01-23 21:53:54 +00:00
# ========== 核心函数保留你的原版 ==========
2026-01-23 14:59:15 +00:00
wait_for_process_exit() {
local pidKilled=$1
2026-01-23 21:53:54 +00:00
local begin=$(date +%s)
2026-01-23 14:59:15 +00:00
local end
2026-01-23 21:53:54 +00:00
local timeout=60
2026-01-23 14:59:15 +00:00
while kill -0 $pidKilled > /dev/null 2>&1; do
2026-01-23 21:53:54 +00:00
echo -n "."
2026-01-23 14:59:15 +00:00
sleep 1;
end=$(date +%s)
if [ $((end - begin)) -gt $timeout ]; then
echo -e "\n⚠ 等待进程$pidKilled退出超时已等${timeout}秒)"
break;
fi
done
if ! kill -0 $pidKilled > /dev/null 2>&1; then
echo -e "\n✅ 进程$pidKilled已退出"
fi
}
pid_is_alive() {
local pid=$1
2026-01-23 14:59:15 +00:00
if [ -n "$pid" ] && kill -0 "$pid" > /dev/null 2>&1; then
2026-01-23 21:53:54 +00:00
return 0
else
2026-01-23 21:53:54 +00:00
return 1
fi
}
get_inner_procs() {
local screen_pid=$1
local procs=$(timeout 5 pstree -p "$screen_pid" 2>/dev/null | grep -oE '\([0-9]+\)' | tr -d '()' | grep -v "$screen_pid" | sort -u)
if [ -z "$procs" ]; then
procs=$(pgrep -f "SCREEN -S $SCREEN_NAME" 2>/dev/null | grep -v "$screen_pid")
fi
echo "$procs"
}
2026-01-22 16:01:52 +00:00
2026-01-23 21:53:54 +00:00
screen_send_cmd() {
local cmd="$1"
local screen_full_id="$2"
# ^M需手动生成Ctrl+v+回车
screen -S "$screen_full_id" -p 0 -X stuff "${cmd}^M"
sleep 1
}
# ========== 核心修复给screen -ls加超时避免卡住 ==========
echo "===== 检测screen会话5秒超时 ====="
# 关键修改给整个提取命令加5秒超时超时则直接设为空
SCREEN_FULL_ID=$(timeout 5 screen -ls 2>/dev/null | grep -E "[0-9]+\.$SCREEN_NAME" | grep -v "Dead\|Invalid" | head -1 | awk '{print $1}')
# 无论是否超时/卡住只要SCREEN_FULL_ID为空就直接走后续
if [ -z "$SCREEN_FULL_ID" ]; then
echo " 未找到$SCREEN_NAME会话或screen -ls执行超时直接执行后续脚本"
2026-01-23 14:59:15 +00:00
else
2026-01-23 21:53:54 +00:00
# 找到会话执行终止逻辑
SCREEN_PID=$(echo "$SCREEN_FULL_ID" | cut -d. -f1)
echo "✅ 找到$SCREEN_NAME主PID$SCREEN_PID | 完整ID$SCREEN_FULL_ID"
2026-01-23 14:59:15 +00:00
2026-01-23 21:53:54 +00:00
# 导出退出前日志
2026-01-23 14:59:15 +00:00
echo -e "\n===== 【退出前】$SCREEN_NAME 内程序实时log ====="
2026-01-23 21:53:54 +00:00
screen -S "$SCREEN_FULL_ID" -p 0 -X hardcopy -h "$LOG_FILE" 2>/dev/null
2026-01-23 14:59:15 +00:00
cat "$LOG_FILE"
2026-01-23 21:53:54 +00:00
# 给子进程发SIGTERM并等待
echo -e "\n===== 给子进程发优雅退出信号(SIGTERM) ====="
2026-01-23 14:59:15 +00:00
INNER_PROCS=$(get_inner_procs "$SCREEN_PID")
if [ -z "$INNER_PROCS" ]; then
echo " 未检测到$SCREEN_NAME下的子进程"
else
2026-01-23 14:59:15 +00:00
echo "待处理子进程:$INNER_PROCS"
for pid in $INNER_PROCS; do
if pid_is_alive "$pid"; then
echo -n "📌 终止进程$pid并等待退出"
2026-01-23 21:53:54 +00:00
kill -15 "$pid"
2026-01-23 14:59:15 +00:00
if [ $? -eq 0 ]; then
2026-01-23 21:53:54 +00:00
wait_for_process_exit "$pid"
else
2026-01-23 14:59:15 +00:00
echo "❌ 进程$pid发送SIGTERM失败"
fi
else
2026-01-23 14:59:15 +00:00
echo " 进程$pid已不存在跳过"
fi
done
2026-01-23 14:59:15 +00:00
fi
2026-01-23 21:53:54 +00:00
# 验证子进程退出状态
2026-01-23 14:59:15 +00:00
echo -e "\n===== 验证子进程退出状态 ====="
REMAIN_PROCS=$(get_inner_procs "$SCREEN_PID")
if [ -z "$REMAIN_PROCS" ]; then
echo "✅ $SCREEN_NAME内部所有程序已退出"
else
echo "⚠️ 仍有残留进程:$REMAIN_PROCS"
fi
2026-01-23 21:53:54 +00:00
# 投递exit命令退出screen
echo -e "\n===== 优雅退出screen会话 ====="
echo "向$SCREEN_NAME投递exit命令..."
screen_send_cmd "exit" "$SCREEN_FULL_ID"
# 等待并验证最终状态
echo -n "等待screen会话自动退出"
begin=$(date +%s)
while timeout 1 screen -ls "$SCREEN_NAME" 2>/dev/null | grep -q -E "[0-9]+\.$SCREEN_NAME"; do
echo -n "."
sleep 1
if [ $((date +%s - begin)) -gt 30 ]; then
echo -e "\n⚠ 等待screen退出超时30秒"
break
2026-01-22 16:01:52 +00:00
fi
2026-01-23 21:53:54 +00:00
done
2026-01-23 14:59:15 +00:00
echo -e "\n===== 最终验证 ====="
2026-01-23 21:53:54 +00:00
if timeout 1 screen -ls "$SCREEN_NAME" 2>/dev/null | grep -q -E "[0-9]+\.$SCREEN_NAME"; then
2026-01-23 14:59:15 +00:00
echo "❌ $SCREEN_NAME会话最终仍未退出"
else
echo "✅ $SCREEN_NAME会话已完全退出"
fi
fi
2026-01-23 21:53:54 +00:00
# ========== 后续脚本必执行永不卡住 ==========
2026-01-23 14:59:15 +00:00
echo -e "\n===== 终止logic会话流程结束继续执行后续脚本 ====="
2026-01-23 21:53:54 +00:00
# 示例后续逻辑
# echo "执行后续任务:备份日志、启动新进程等..."
# ===== 准备下载目录 =====
echo "创建工作目录:%s{work_dir}"
mkdir -p "%s{work_dir}" || { echo "❌ 创建目录失败"; exit 1; }
# ===== 下载程序 =====
echo "===== 开始下载程序 ====="
echo "下载链接:%s{file_url}"
echo "目标路径:%s{exe_path}"
# 删除旧文件
if [ -f "%s{exe_path}" ]; then
echo "删除旧文件:%s{exe_path}"
rm -f "%s{exe_path}"
fi
# ===== 准备下载目录 =====
echo "创建工作目录:%s{work_dir}"
mkdir -p "%s{work_dir}" || { echo "❌ 创建目录失败"; exit 1; }
# ===== 下载程序 =====
echo "===== 开始下载程序 ====="
echo "下载链接:%s{file_url}"
echo "目标路径:%s{exe_path}"
# 删除旧文件关键修复这里要判断目标路径不是下载链接
if [ -f "%s{exe_path}" ]; then
echo "删除旧文件:%s{exe_path}"
rm -f "%s{exe_path}"
fi
echo "开始下载..."
DOWNLOAD_SUCCESS=0
if command -v wget >/dev/null 2>&1; then
# 正确格式wget -O 目标路径 下载链接
wget --no-check-certificate --timeout=10 --tries=2 -O "%s{exe_path}" "%s{file_url}"
DOWNLOAD_SUCCESS=$?
elif command -v curl >/dev/null 2>&1; then
# 正确格式curl -o 目标路径 下载链接
curl -L --connect-timeout 10 --max-time 30 -o "%s{exe_path}" "%s{file_url}"
DOWNLOAD_SUCCESS=$?
else
echo "❌ 无wget/curl无法下载"
exit 1
fi
if [ $DOWNLOAD_SUCCESS -ne 0 ]; then
echo "❌ 下载失败,退出码:$DOWNLOAD_SUCCESS"
exit 1
fi
# 验证文件
if [ -f "%s{exe_path}" ] && [ -s "%s{exe_path}" ]; then
echo "=== 文件下载完成 ==="
ls -la "%s{exe_path}"
# 检查文件大小至少1KB
FILE_SIZE=$(stat -c%s "%s{exe_path}" 2>/dev/null || stat -f%z "%s{exe_path}" 2>/dev/null)
[ "$FILE_SIZE" -lt 1024 ] && { echo "❌ 文件太小($FILE_SIZE字节"; exit 1; }
else
echo "❌ 文件下载失败或为空"
exit 1
fi
# ===== 启动新程序 =====
echo "设置执行权限:%s{exe_path}"
chmod +x "%s{exe_path}" || { echo "❌ 设置权限失败"; exit 1; }
echo "启动Screen会话[%s{screen_name}]..."
screen -dmS "%s{screen_name}" bash -c '"%s{exe_path}" -id=%s{online_id} | tee -a "$HOME/run.log"'
2026-01-22 16:01:52 +00:00
sleep 2
if screen -ls | grep -q "%s{screen_name}"; then
echo "✅ 程序启动成功!会话名称:%s{screen_name}"
screen -ls
else
echo "❌ 程序启动失败,未创建[%s{screen_name}]会话"
screen -ls
exit 1
fi
echo "#SCRIPT_EXECUTION_COMPLETE#"
`
// 4. 定义参数映射(用占位符替换,彻底避免数错顺序)
deploymentScript := strings.ReplaceAll(deploymentScriptTpl, "%s{screen_name}", fixedScreenSession)
deploymentScript = strings.ReplaceAll(deploymentScript, "%s{work_dir}", remoteWorkDir)
deploymentScript = strings.ReplaceAll(deploymentScript, "%s{file_url}", fileURL)
deploymentScript = strings.ReplaceAll(deploymentScript, "%s{exe_path}", remoteExePath)
deploymentScript = strings.ReplaceAll(deploymentScript, "%s{online_id}", onlineID)
// 5. 执行脚本
_, err := s.executeScript(deploymentScript, "full_deployment_"+grand.S(10))
if err != nil {
return fmt.Errorf("执行部署脚本失败:%w", err)
}
// 6. 保存会话名称
2026-01-23 20:34:52 +00:00
config.NewServerService().SetServerScreen(s.ServerList.OnlineID, filename)
return nil
}
// executeCommand 执行单个命令并返回输出
func (s *ServerHandler) executeCommand(command string) (string, error) {
return s.executeScript(command, "cmd_"+grand.S(8))
}
// ---------------- 主流程OnOpen严格顺序执行 ----------------
func (s *ServerHandler) OnOpen(socket *gws.Conn) {
// 1. 建立SSH连接
sshConfig := &ssh.ClientConfig{
User: s.ServerList.Account,
Auth: []ssh.AuthMethod{ssh.Password(s.ServerList.Password)},
HostKeyCallback: ssh.InsecureIgnoreHostKey(),
Timeout: 30 * time.Second,
}
// 连接服务器(重试)
var sshClient *ssh.Client
var err error
for retry := 0; retry < 2; retry++ {
sshClient, err = ssh.Dial("tcp", s.ServerList.LoginAddr, sshConfig)
if err == nil {
break
}
time.Sleep(3 * time.Second)
}
if err != nil {
s.sendError(socket, "SSH连接失败"+err.Error())
return
}
// 创建会话(重试)
var session *ssh.Session
for retry := 0; retry < 2; retry++ {
session, err = sshClient.NewSession()
if err == nil {
break
}
time.Sleep(3 * time.Second)
}
if err != nil {
session.Close()
sshClient.Close()
s.sendError(socket, "创建SSH会话失败"+err.Error())
return
}
// 配置PTY通用配置
modes := ssh.TerminalModes{
ssh.ECHO: 1,
ssh.TTY_OP_ISPEED: 115200,
ssh.TTY_OP_OSPEED: 115200,
}
err = session.RequestPty("vt100", 100, 80, modes)
if err != nil {
session.Close()
sshClient.Close()
s.sendError(socket, "PTY请求失败"+err.Error())
return
}
// 获取Stdin/Stdout
stdin, err := session.StdinPipe()
if err != nil {
session.Close()
sshClient.Close()
s.sendError(socket, "获取Stdin失败"+err.Error())
return
}
stdout, err := session.StdoutPipe()
if err != nil {
session.Close()
sshClient.Close()
s.sendError(socket, "获取Stdout失败"+err.Error())
return
}
// 启动Shell
err = session.Shell()
if err != nil {
session.Close()
sshClient.Close()
s.sendError(socket, "启动Shell失败"+err.Error())
return
}
// 初始化会话
s.session = &TerminalSession{
WebSocket: socket,
SSHClient: sshClient,
SSHSession: session,
Stdin: stdin,
Stdout: stdout,
Ready: true,
outputBuf: make(chan string, 2048), // 增大缓存
closed: atomic.Bool{},
}
// 启动输出转发协程
s.startOutputForwarding()
s.sendSuccess(socket, "SSH连接成功会话已就绪")
// ---------------- 严格按顺序执行自动化部署 ----------------
if s.isinstall == 1 {
// 执行完整的自动化部署脚本
err := s.executeFullDeployment()
if err != nil {
s.sendError(socket, "自动化部署失败: "+err.Error())
return
}
s.sendSuccess(socket, "自动化部署完成")
}
}
// startOutputForwarding 启动输出转发协程
func (s *ServerHandler) startOutputForwarding() {
if s.session == nil {
return
}
go func() {
defer func() {
if r := recover(); r != nil {
glog.Error(context.Background(), "Output forwarding goroutine panic:", r)
}
}()
scanner := bufio.NewScanner(s.session.Stdout)
for scanner.Scan() {
line := scanner.Text()
s.sendTerminalOutput(s.session.WebSocket, line+"\r\n")
}
if err := scanner.Err(); err != nil && s.session != nil {
glog.Error(context.Background(), "读取 SSH 输出失败:", err)
s.sendError(s.session.WebSocket, "读取 SSH 输出失败: "+err.Error())
}
}()
}
// ---------------- 基础函数 ----------------
func (s *ServerHandler) OnMessage(socket *gws.Conn, gwsmessage *gws.Message) {
if s.session == nil || s.session.Stdin == nil {
s.sendError(socket, "SSH 会话未建立")
return
}
_, err := s.session.Stdin.Write([]byte(gwsmessage.Data.Bytes()))
if err != nil {
s.sendError(socket, "写入消息失败:"+err.Error())
}
}
func (s *ServerHandler) OnClose(socket *gws.Conn, err error) {
glog.Debug(context.Background(), "连接断开:", err)
if s.session != nil {
s.session.closed.Store(true)
if s.session.SSHSession != nil {
s.session.SSHSession.Close()
}
if s.session.SSHClient != nil {
s.session.SSHClient.Close()
}
s.session = nil
}
}
// 发送终端输出
func (s *ServerHandler) sendTerminalOutput(ws *gws.Conn, output string) {
msg := WebSSHMessage{Type: "output", Payload: output}
data, _ := json.Marshal(msg)
ws.WriteMessage(gws.OpcodeText, data)
}
// 发送成功消息
func (s *ServerHandler) sendSuccess(ws *gws.Conn, msg string) {
res := WebSSHMessage{Type: "success", Payload: msg}
data, _ := json.Marshal(res)
ws.WriteMessage(gws.OpcodeText, data)
}
// 发送错误消息
func (s *ServerHandler) sendError(ws *gws.Conn, msg string) {
res := WebSSHMessage{Type: "error", Payload: msg}
data, _ := json.Marshal(res)
ws.WriteMessage(gws.OpcodeText, data)
}