websoft9/appmanage/api/v1/routers/apps.py
2023-04-06 09:18:30 +08:00

107 lines
5.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from typing import Optional, List
from fastapi import APIRouter, status, Depends, Query
from pydantic import BaseModel, Field
from starlette.responses import JSONResponse
import os, io, sys, platform, shutil, time, subprocess, json, datetime
from api.model.app import App
from api.model.response import Response
from api.service import manage
from api.utils import shell_execute
from api.utils.common_log import myLogger
router = APIRouter()
rd1 = "code请求操作内部响应码\n\nmessage请求操作结果描述\n\ndata返回请求结果内容\n\n" \
"[\n\n" \
"  app_id应用ID,\n\n" \
"  name应用名,\n\n" \
"  customer_name自定义应用名,\n\n" \
"  trade_mark应用商标,\n\n" \
"  status_code应用运行状态码,\n\n"
status = '  status应用运行状态,\n\n'
status_detail = "  status应用运行状态,running:正常运行stop停止error错误\n\n"
status_list = "  status应用运行状态,waiting等待安装installing安装中running:正常运行stop停止error错误\n\n"
status_process = "  status应用运行状态,pulling拉取镜像creating容器启动inting容器初始化running:正常运行)\n\n"
rd2 = "  port应用端口,\n\n" \
"  volumeyml文件路径,\n\n" \
"  url应用网址,\n\n" \
"  image_url图片路径,\n\n" \
"  admin_url管理员网址,\n\n" \
"  user_name用户名,\n\n" \
"  password密码,\n\n" \
"  official_app是否为官方应用\n\n" \
"]"
rd = rd1 + status + rd2
rd_detail = rd1 + status_detail + rd2
rd_list = rd1 + status_list + rd2
rd_process = rd1 + status_process + rd2
@router.api_route("/details", methods=["GET", "POST"], summary="获取指定APP的信息",
response_description=rd_detail,
response_model=Response)
def app_detail(app_id: Optional[str] = Query(default=None, description="应用ID")):
myLogger.info_logger("Receive request: /api/v1/apps/details")
list = manage.get_app_detail(app_id)
return JSONResponse(list)
@router.api_route("", methods=["GET", "POST"], summary="获取所有APP的信息", response_description=rd_list,
response_model=Response)
def list_my_apps():
myLogger.info_logger("Receive request: /api/v1/apps")
list = manage.get_my_app()
return JSONResponse(content=list)
@router.api_route("/install", methods=["GET", "POST"], summary="安装APP", response_description=rd,
response_model=Response)
def install_app(app_name: Optional[str] = Query(default=None, description="应用名"),
customer_app_name: Optional[str] = Query(default=None, description="应用自定义名字"),
app_version: Optional[str] = Query(default=None, description="应用版本")):
myLogger.info_logger("Receive request: /api/v1/apps/install")
ret = manage.install_app(app_name, customer_app_name, app_version)
return JSONResponse(content=ret)
@router.api_route("/process", methods=["GET", "POST"], summary="获取指定APP的安装进度",
response_description=rd_process,
response_model=Response)
def install_app_process(app_id: Optional[str] = Query(default=None, description="应用ID")):
myLogger.info_logger("Receive request: /api/v1/apps/process")
ret = manage.install_app_process(app_id)
return JSONResponse(content=ret)
@router.api_route("/start", methods=["GET", "POST"], summary="启动APP", response_description=rd, response_model=Response)
def start_app(app_id: Optional[str] = Query(default=None, description="应用ID")):
myLogger.info_logger("Receive request: /api/v1/apps/start")
ret = manage.start_app(app_id)
return JSONResponse(content=ret)
@router.api_route("/stop", methods=["GET", "POST"], summary="停止APP", response_description=rd, response_model=Response)
def stop_app(app_id: Optional[str] = Query(default=None, description="应用ID")):
myLogger.info_logger("Receive request: /api/v1/apps/stop")
ret = manage.stop_app(app_id)
return JSONResponse(content=ret)
@router.api_route("/restart", methods=["GET", "POST"], summary="重启APP", response_description=rd,
response_model=Response)
def restart_app(app_id: Optional[str] = Query(default=None, description="应用ID")):
myLogger.info_logger("Receive request: /api/v1/apps/restart")
ret = manage.restart_app(app_id)
return JSONResponse(content=ret)
@router.api_route("/uninstall", methods=["GET", "POST"], summary="卸载APP", response_description=rd,
response_model=Response)
def uninstall_app(app_id: Optional[str] = Query(default=None, description="应用ID")):
myLogger.info_logger("Receive request: /api/v1/apps/uninstall")
ret = manage.uninstall_app(app_id)
return JSONResponse(content=ret)