Files
bl/modules/base/middleware/server.go
2026-01-23 14:59:15 +00:00

637 lines
19 KiB
Go
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package middleware
import (
"blazing/modules/config/model"
config "blazing/modules/config/service"
"bufio"
"context"
"crypto/rand"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"net"
"strings"
"sync/atomic"
"time"
"github.com/gogf/gf/v2/os/glog"
"github.com/gogf/gf/v2/util/grand"
"github.com/lxzan/gws"
"golang.org/x/crypto/ssh"
)
const (
PingInterval = 10 * time.Second
defaultWorkDir = "$HOME" // 全环境兼容
randomStrLength = 4 // 缩短随机串长度
cmdTimeout = 120 * time.Second // 延长超时适配Screen安装+下载)
)
// SSHConfig SSH连接配置
type SSHConfig struct {
ServerIP string `json:"server_ip"`
Port string `json:"port"`
Username string `json:"username"`
Password string `json:"password"`
}
// WebSSHMessage 消息结构
type WebSSHMessage struct {
Type string `json:"type"`
Payload string `json:"payload"`
}
// TerminalSession 会话结构体(优化通道)
type TerminalSession struct {
WebSocket *gws.Conn
SSHClient *ssh.Client
SSHSession *ssh.Session
Stdin io.WriteCloser
Stdout io.Reader
Ready bool
outputBuf chan string // 增大通道缓存
closed atomic.Bool
}
type ServerHandler struct {
gws.BuiltinEventHandler
session *TerminalSession
model.ServerList
isinstall uint32
target net.Conn
}
// 生成随机文件名
func (s *ServerHandler) generateRandomFileName() string {
randBytes := make([]byte, randomStrLength)
_, err := rand.Read(randBytes)
if err != nil {
return fmt.Sprintf("logic_%d", time.Now().UnixNano())
}
randStr := hex.EncodeToString(randBytes)
timestamp := time.Now().Format("20060102150405")
return fmt.Sprintf("logic_%s_%s", timestamp, randStr)
}
// 执行脚本命令
func (s *ServerHandler) executeScript(scriptContent, scriptName string) (string, error) {
if s.session == nil || s.session.closed.Load() || !s.session.Ready {
return "", fmt.Errorf("session not ready")
}
// 生成临时脚本路径
scriptPath := fmt.Sprintf("/tmp/%s.sh", scriptName)
// 直接将脚本内容写入文件
writeScriptCmd := fmt.Sprintf("cat > '%s' << 'DEPLOYMENT_SCRIPT_END'\n%s\nDEPLOYMENT_SCRIPT_END\n", scriptPath, scriptContent)
_, err := s.session.Stdin.Write([]byte(writeScriptCmd))
if err != nil {
return "", err
}
// 等待一会儿确保脚本写入完成
time.Sleep(time.Second)
// 设置执行权限
_, err = s.session.Stdin.Write([]byte(fmt.Sprintf("chmod +x %s\n", scriptPath)))
if err != nil {
return "", err
}
// 等待权限设置完成
time.Sleep(time.Second)
// 执行脚本并将输出发送到WebSocket在命令中直接删除脚本文件
executeCmd := fmt.Sprintf("bash %s 2>&1; rm -f %s\n", scriptPath, scriptPath)
_, err = s.session.Stdin.Write([]byte(executeCmd))
if err != nil {
return "", err
}
// 读取脚本执行输出
output := ""
done := make(chan bool)
go func() {
defer func() {
if r := recover(); r != nil {
glog.Error(context.Background(), "Script execution goroutine panic:", r)
}
}()
scanner := bufio.NewScanner(s.session.Stdout)
for scanner.Scan() {
line := scanner.Text()
// 检测到脚本执行完成标记则退出
if strings.Contains(line, "#SCRIPT_EXECUTION_COMPLETE#") {
break
}
// 忽略一些可能导致连接断开的输出
if strings.Contains(line, "logout") || strings.Contains(line, "exit") {
continue
}
output += line + "\n"
s.sendTerminalOutput(s.session.WebSocket, line)
}
done <- true
}()
// 等待脚本执行完成或超时
select {
case <-done:
return strings.TrimSpace(output), nil
case <-time.After(cmdTimeout):
return strings.TrimSpace(output), nil
}
}
func (s *ServerHandler) executeFullDeployment() error {
s.sendTerminalOutput(s.session.WebSocket, "开始执行完整自动化部署流程...")
// 1. 获取并校验下载链接
fileURL := config.NewServerService().GetFile()
fileURL = strings.TrimSpace(fileURL)
if fileURL == "" {
return fmt.Errorf("下载链接为空")
}
// 前置校验确保链接是合法的URL包含http/https
if !strings.HasPrefix(fileURL, "http://") && !strings.HasPrefix(fileURL, "https://") {
return fmt.Errorf("下载链接格式非法缺少http/https协议%s", fileURL)
}
s.sendTerminalOutput(s.session.WebSocket, fmt.Sprintf("【前置检查】有效下载链接:%s", fileURL))
// 2. 生成目标文件路径
randomFileName := s.generateRandomFileName()
remoteExePath := fmt.Sprintf("%s/%s", defaultWorkDir, randomFileName)
remoteWorkDir := defaultWorkDir
onlineID := fmt.Sprintf("%d", s.ServerList.OnlineID)
fixedScreenSession := "logic"
// 3. 定义部署脚本(给每个%s加唯一标记方便核对
deploymentScriptTpl := `
set -e
set -x
# ===== 检查并安装screen =====
echo "检查Screen是否已安装..."
if command -v screen &> /dev/null; then
echo "=== Screen已安装跳过 ==="
else
echo "=== Screen未安装开始安装 ==="
if command -v apt &> /dev/null; then
apt update -y && apt install -y screen
elif command -v yum &> /dev/null; then
yum install -y screen
elif command -v dnf &> /dev/null; then
dnf install -y screen
elif command -v pacman &> /dev/null; then
pacman -S --noconfirm screen
else
echo "❌ 不支持的包管理器无法安装Screen"
exit 1
fi
command -v screen || { echo "❌ Screen安装失败"; exit 1; }
fi
#!/bin/bash
# ===== 优雅终止logic会话先等内部程序退出 → 再退screen=====
echo "===== 优雅终止logic会话 ====="
SCREEN_PID=""
# 替换为你实际的screen名称示例logic
SCREEN_NAME="%s{screen_name}"
# 日志文件路径(可根据需要调整)
LOG_FILE="./screen_logic_exit.log"
# 调试开关如需详细日志取消set -x注释
set -o pipefail
export PS4='[DEBUG] ${BASH_SOURCE}:${LINENO} - ${FUNCNAME[0]:+${FUNCNAME[0]}(): }'
# set -x
# ========== 核心函数基于kill -0的等待进程退出函数 ==========
# 等待进程结束带60秒超时用kill -0检测存活输出进度点
wait_for_process_exit() {
local pidKilled=$1
local begin=$(date +%s) # 记录开始时间(秒)
local end
local timeout=60 # 最大等待时间(秒)
# 循环检测进程是否存活
while kill -0 $pidKilled > /dev/null 2>&1; do
echo -n "." # 输出进度点,直观显示等待过程
sleep 1;
# 检查是否超时
end=$(date +%s)
if [ $((end - begin)) -gt $timeout ]; then
echo -e "\n⚠ 等待进程$pidKilled退出超时已等${timeout}秒)"
break;
fi
done
# 最终状态提示
if ! kill -0 $pidKilled > /dev/null 2>&1; then
echo -e "\n✅ 进程$pidKilled已退出"
fi
}
# 定义检查PID是否存活的函数复用kill -0逻辑
pid_is_alive() {
local pid=$1
if [ -n "$pid" ] && kill -0 "$pid" > /dev/null 2>&1; then
return 0 # PID存活
else
return 1 # PID不存在
fi
}
# 定义获取screen会话下的所有子进程PID
get_inner_procs() {
local screen_pid=$1
# 5秒超时避免pstree卡住
local procs=$(timeout 5 pstree -p "$screen_pid" 2>/dev/null | grep -oE '\([0-9]+\)' | tr -d '()' | grep -v "$screen_pid" | sort -u)
# 兜底pstree失败时用pgrep按screen名称查找
if [ -z "$procs" ]; then
procs=$(pgrep -f "SCREEN -S $SCREEN_NAME" 2>/dev/null | grep -v "$screen_pid")
fi
echo "$procs"
}
# 第一步提取screen主PID并校验
SCREEN_PID=$(screen -ls "$SCREEN_NAME" | grep -oE '[0-9]+\.'"$SCREEN_NAME" | head -1 | cut -d. -f1)
if [ -z "$SCREEN_PID" ]; then
echo " 未找到$SCREEN_NAME会话对应的PID跳过终止流程"
# 去掉exit 1直接跳过后续逻辑不影响其他脚本执行
else
# ========== 仅当找到PID时才执行以下终止流程 ==========
echo "✅ 找到$SCREEN_NAME主PID$SCREEN_PID"
# 第二步:导出【退出前】的实时日志(终端+文件双输出)
echo -e "\n===== 【退出前】$SCREEN_NAME 内程序实时log ====="
screen -S "$SCREEN_NAME" -p 0 -X hardcopy -h "$LOG_FILE" 2>/dev/null
cat "$LOG_FILE"
# 第三步给所有子进程发SIGTERM信号并等待进程退出
echo -e "\n===== 开始给所有子进程发优雅退出信号(SIGTERM) ====="
INNER_PROCS=$(get_inner_procs "$SCREEN_PID")
if [ -z "$INNER_PROCS" ]; then
echo " 未检测到$SCREEN_NAME下的子进程"
else
echo "待处理子进程:$INNER_PROCS"
for pid in $INNER_PROCS; do
if pid_is_alive "$pid"; then
echo -n "📌 终止进程$pid并等待退出"
kill -15 "$pid" # 发送SIGTERM优雅退出信号
if [ $? -eq 0 ]; then
wait_for_process_exit "$pid" # 调用等待函数
else
echo "❌ 进程$pid发送SIGTERM失败"
fi
else
echo " 进程$pid已不存在跳过"
fi
done
fi
# 验证子进程是否全部退出
echo -e "\n===== 验证子进程退出状态 ====="
REMAIN_PROCS=$(get_inner_procs "$SCREEN_PID")
INNER_PROC_EXIST=true
if [ -z "$REMAIN_PROCS" ]; then
INNER_PROC_EXIST=false
echo "✅ $SCREEN_NAME内部所有程序已退出"
else
echo "⚠️ 仍有残留进程:$REMAIN_PROCS"
fi
# 退出screen会话
if [ "$INNER_PROC_EXIST" = false ]; then
echo "内部程序已退出,开始退出$SCREEN_NAME会话..."
# 尝试正常退出screen会话
if screen -S "$SCREEN_NAME" -X quit 2>/dev/null; then
echo "✅ $SCREEN_NAME会话已通过screen -X quit退出"
else
echo " screen -X quit执行失败检查PID是否存活..."
# 仅当PID存活时执行kill
if pid_is_alive "$SCREEN_PID"; then
echo "📌 PID $SCREEN_PID 仍存活尝试kill终止..."
timeout 5 kill -15 "$SCREEN_PID" 2>/dev/null
if [ $? -eq 0 ]; then
echo "✅ 已给screen主进程$SCREEN_PID发送SIGTERM信号"
else
echo "⚠️ kill $SCREEN_PID 失败"
fi
else
echo " PID $SCREEN_PID 已不存在跳过kill操作"
fi
fi
sleep 2
else
echo "⚠️ 内部程序未完全退出跳过退出screen会话"
fi
# 最终验证
echo -e "\n===== 最终验证 ====="
if screen -ls "$SCREEN_NAME" 2>/dev/null | grep -q -E "[0-9]+\.$SCREEN_NAME"; then
echo "❌ $SCREEN_NAME会话最终仍未退出"
else
echo "✅ $SCREEN_NAME会话已完全退出"
fi
fi
# 后续脚本可以从这里继续执行,不受上述逻辑影响
echo -e "\n===== 终止logic会话流程结束继续执行后续脚本 ====="
# ===== 准备下载目录 =====
echo "创建工作目录:%s{work_dir}"
mkdir -p "%s{work_dir}" || { echo "❌ 创建目录失败"; exit 1; }
# ===== 下载程序 =====
echo "===== 开始下载程序 ====="
echo "下载链接:%s{file_url}"
echo "目标路径:%s{exe_path}"
# 删除旧文件
if [ -f "%s{exe_path}" ]; then
echo "删除旧文件:%s{exe_path}"
rm -f "%s{exe_path}"
fi
# ===== 准备下载目录 =====
echo "创建工作目录:%s{work_dir}"
mkdir -p "%s{work_dir}" || { echo "❌ 创建目录失败"; exit 1; }
# ===== 下载程序 =====
echo "===== 开始下载程序 ====="
echo "下载链接:%s{file_url}"
echo "目标路径:%s{exe_path}"
# 删除旧文件(关键修复:这里要判断目标路径,不是下载链接)
if [ -f "%s{exe_path}" ]; then
echo "删除旧文件:%s{exe_path}"
rm -f "%s{exe_path}"
fi
echo "开始下载..."
DOWNLOAD_SUCCESS=0
if command -v wget >/dev/null 2>&1; then
# 正确格式wget -O 目标路径 下载链接
wget --no-check-certificate --timeout=10 --tries=2 -O "%s{exe_path}" "%s{file_url}"
DOWNLOAD_SUCCESS=$?
elif command -v curl >/dev/null 2>&1; then
# 正确格式curl -o 目标路径 下载链接
curl -L --connect-timeout 10 --max-time 30 -o "%s{exe_path}" "%s{file_url}"
DOWNLOAD_SUCCESS=$?
else
echo "❌ 无wget/curl无法下载"
exit 1
fi
if [ $DOWNLOAD_SUCCESS -ne 0 ]; then
echo "❌ 下载失败,退出码:$DOWNLOAD_SUCCESS"
exit 1
fi
# 验证文件
if [ -f "%s{exe_path}" ] && [ -s "%s{exe_path}" ]; then
echo "=== 文件下载完成 ==="
ls -la "%s{exe_path}"
# 检查文件大小至少1KB
FILE_SIZE=$(stat -c%s "%s{exe_path}" 2>/dev/null || stat -f%z "%s{exe_path}" 2>/dev/null)
[ "$FILE_SIZE" -lt 1024 ] && { echo "❌ 文件太小($FILE_SIZE字节"; exit 1; }
else
echo "❌ 文件下载失败或为空"
exit 1
fi
# ===== 启动新程序 =====
echo "设置执行权限:%s{exe_path}"
chmod +x "%s{exe_path}" || { echo "❌ 设置权限失败"; exit 1; }
echo "启动Screen会话[%s{screen_name}]..."
screen -dmS "%s{screen_name}" bash -c '"%s{exe_path}" -id=%s{online_id} | tee -a "$HOME/run.log"'
sleep 2
if screen -ls | grep -q "%s{screen_name}"; then
echo "✅ 程序启动成功!会话名称:%s{screen_name}"
screen -ls
else
echo "❌ 程序启动失败,未创建[%s{screen_name}]会话"
screen -ls
exit 1
fi
echo "#SCRIPT_EXECUTION_COMPLETE#"
`
// 4. 定义参数映射(用占位符替换,彻底避免数错顺序)
deploymentScript := strings.ReplaceAll(deploymentScriptTpl, "%s{screen_name}", fixedScreenSession)
deploymentScript = strings.ReplaceAll(deploymentScript, "%s{work_dir}", remoteWorkDir)
deploymentScript = strings.ReplaceAll(deploymentScript, "%s{file_url}", fileURL)
deploymentScript = strings.ReplaceAll(deploymentScript, "%s{exe_path}", remoteExePath)
deploymentScript = strings.ReplaceAll(deploymentScript, "%s{online_id}", onlineID)
// 5. 执行脚本
_, err := s.executeScript(deploymentScript, "full_deployment_"+grand.S(10))
if err != nil {
return fmt.Errorf("执行部署脚本失败:%w", err)
}
// 6. 保存会话名称
config.NewServerService().SetServerScreen(s.ServerList.OnlineID, fixedScreenSession)
s.sendTerminalOutput(s.session.WebSocket, "自动化部署完成")
return nil
}
// executeCommand 执行单个命令并返回输出
func (s *ServerHandler) executeCommand(command string) (string, error) {
return s.executeScript(command, "cmd_"+grand.S(8))
}
// ---------------- 主流程OnOpen严格顺序执行 ----------------
func (s *ServerHandler) OnOpen(socket *gws.Conn) {
// 1. 建立SSH连接
sshConfig := &ssh.ClientConfig{
User: s.ServerList.Account,
Auth: []ssh.AuthMethod{ssh.Password(s.ServerList.Password)},
HostKeyCallback: ssh.InsecureIgnoreHostKey(),
Timeout: 30 * time.Second,
}
// 连接服务器(重试)
var sshClient *ssh.Client
var err error
for retry := 0; retry < 2; retry++ {
sshClient, err = ssh.Dial("tcp", s.ServerList.LoginAddr, sshConfig)
if err == nil {
break
}
time.Sleep(3 * time.Second)
}
if err != nil {
s.sendError(socket, "SSH连接失败"+err.Error())
return
}
// 创建会话(重试)
var session *ssh.Session
for retry := 0; retry < 2; retry++ {
session, err = sshClient.NewSession()
if err == nil {
break
}
time.Sleep(3 * time.Second)
}
if err != nil {
session.Close()
sshClient.Close()
s.sendError(socket, "创建SSH会话失败"+err.Error())
return
}
// 配置PTY通用配置
modes := ssh.TerminalModes{
ssh.ECHO: 1,
ssh.TTY_OP_ISPEED: 115200,
ssh.TTY_OP_OSPEED: 115200,
}
err = session.RequestPty("vt100", 100, 80, modes)
if err != nil {
session.Close()
sshClient.Close()
s.sendError(socket, "PTY请求失败"+err.Error())
return
}
// 获取Stdin/Stdout
stdin, err := session.StdinPipe()
if err != nil {
session.Close()
sshClient.Close()
s.sendError(socket, "获取Stdin失败"+err.Error())
return
}
stdout, err := session.StdoutPipe()
if err != nil {
session.Close()
sshClient.Close()
s.sendError(socket, "获取Stdout失败"+err.Error())
return
}
// 启动Shell
err = session.Shell()
if err != nil {
session.Close()
sshClient.Close()
s.sendError(socket, "启动Shell失败"+err.Error())
return
}
// 初始化会话
s.session = &TerminalSession{
WebSocket: socket,
SSHClient: sshClient,
SSHSession: session,
Stdin: stdin,
Stdout: stdout,
Ready: true,
outputBuf: make(chan string, 2048), // 增大缓存
closed: atomic.Bool{},
}
// 启动输出转发协程
s.startOutputForwarding()
s.sendSuccess(socket, "SSH连接成功会话已就绪")
// ---------------- 严格按顺序执行自动化部署 ----------------
if s.isinstall == 1 {
// 执行完整的自动化部署脚本
err := s.executeFullDeployment()
if err != nil {
s.sendError(socket, "自动化部署失败: "+err.Error())
return
}
s.sendSuccess(socket, "自动化部署完成")
}
}
// startOutputForwarding 启动输出转发协程
func (s *ServerHandler) startOutputForwarding() {
if s.session == nil {
return
}
go func() {
defer func() {
if r := recover(); r != nil {
glog.Error(context.Background(), "Output forwarding goroutine panic:", r)
}
}()
scanner := bufio.NewScanner(s.session.Stdout)
for scanner.Scan() {
line := scanner.Text()
s.sendTerminalOutput(s.session.WebSocket, line+"\r\n")
}
if err := scanner.Err(); err != nil && s.session != nil {
glog.Error(context.Background(), "读取 SSH 输出失败:", err)
s.sendError(s.session.WebSocket, "读取 SSH 输出失败: "+err.Error())
}
}()
}
// ---------------- 基础函数 ----------------
func (s *ServerHandler) OnMessage(socket *gws.Conn, gwsmessage *gws.Message) {
if s.session == nil || s.session.Stdin == nil {
s.sendError(socket, "SSH 会话未建立")
return
}
_, err := s.session.Stdin.Write([]byte(gwsmessage.Data.Bytes()))
if err != nil {
s.sendError(socket, "写入消息失败:"+err.Error())
}
}
func (s *ServerHandler) OnClose(socket *gws.Conn, err error) {
glog.Debug(context.Background(), "连接断开:", err)
if s.session != nil {
s.session.closed.Store(true)
if s.session.SSHSession != nil {
s.session.SSHSession.Close()
}
if s.session.SSHClient != nil {
s.session.SSHClient.Close()
}
s.session = nil
}
}
// 发送终端输出
func (s *ServerHandler) sendTerminalOutput(ws *gws.Conn, output string) {
msg := WebSSHMessage{Type: "output", Payload: output}
data, _ := json.Marshal(msg)
ws.WriteMessage(gws.OpcodeText, data)
}
// 发送成功消息
func (s *ServerHandler) sendSuccess(ws *gws.Conn, msg string) {
res := WebSSHMessage{Type: "success", Payload: msg}
data, _ := json.Marshal(res)
ws.WriteMessage(gws.OpcodeText, data)
}
// 发送错误消息
func (s *ServerHandler) sendError(ws *gws.Conn, msg string) {
res := WebSSHMessage{Type: "error", Payload: msg}
data, _ := json.Marshal(res)
ws.WriteMessage(gws.OpcodeText, data)
}