mirror of
https://github.com/Websoft9/websoft9.git
synced 2025-02-03 09:48:38 +08:00
update
This commit is contained in:
parent
97173eb0d5
commit
23b44dad1d
0
appmanage_new/app/__init__.py
Normal file
0
appmanage_new/app/__init__.py
Normal file
17
appmanage_new/app/config/config.ini
Normal file
17
appmanage_new/app/config/config.ini
Normal file
@ -0,0 +1,17 @@
|
||||
# nginx_proxy_manager is the base url of nginx proxy manager, which is used to configure the proxy
|
||||
[nginx_proxy_manager]
|
||||
base_url = http://websoft9-nginxproxymanager:81
|
||||
|
||||
# public_ip_url_list is a list of public ip url, which is used to get the public ip of the server
|
||||
[public_ip_url_list]
|
||||
url_list = https://api.ipify.org/,
|
||||
https://icanhazip.com/,
|
||||
http://ifconfig.co/,
|
||||
https://ident.me/,
|
||||
https://ifconfig.me/,
|
||||
https://ipecho.net/plain,
|
||||
https://ipinfo.io/ip,
|
||||
https://ip.sb/,
|
||||
http://whatismyip.akamai.com/,
|
||||
https://inet-ip.info/,
|
||||
http://bot.whatismyipaddress.com/
|
@ -1,4 +1,99 @@
|
||||
NGINX_URL = "http://websoft9-nginxproxymanager:81"
|
||||
# ARTIFACT_URL="https://artifact.azureedge.net/release/websoft9"
|
||||
ARTIFACT_URL = "https://w9artifact.blob.core.windows.net/release/websoft9"
|
||||
ARTIFACT_URL_DEV = "https://w9artifact.blob.core.windows.net/dev/websoft9"
|
||||
"""
|
||||
FileName: config.py
|
||||
Author: Jing.zhao
|
||||
Created: 2023-08-31
|
||||
Description: This script demonstrates how to managing configuration using configparser.
|
||||
|
||||
Modified by:
|
||||
Modified Date:
|
||||
Modification:
|
||||
"""
|
||||
|
||||
import configparser
|
||||
|
||||
|
||||
class ConfigManager:
|
||||
"""
|
||||
A class for managing configuration using configparser.
|
||||
|
||||
This class provides methods for reading, modifying, and saving configuration data using the configparser library.
|
||||
It allows getting, setting, and removing values from a configuration file.
|
||||
|
||||
Args:
|
||||
config_file_path (str): The path to the configuration file, default is "../config/config.ini".
|
||||
|
||||
Attributes:
|
||||
config_file_path (str): The path to the configuration file.
|
||||
config (configparser.ConfigParser): The configuration data in memory.
|
||||
"""
|
||||
|
||||
def __init__(self, config_file_path="../config/config.ini"):
|
||||
"""
|
||||
Initialize a ConfigManager instance.
|
||||
|
||||
Args:
|
||||
config_file_path (str): The path to the configuration file.
|
||||
"""
|
||||
self.config_file_path = config_file_path
|
||||
self.config = configparser.ConfigParser()
|
||||
self.config.read(self.config_file_path)
|
||||
|
||||
def _save_config(self):
|
||||
"""
|
||||
Save the configuration data to the file.
|
||||
|
||||
This method writes the current configuration data to the file specified during initialization.
|
||||
"""
|
||||
with open(self.config_file_path, 'w') as configfile:
|
||||
self.config.write(configfile)
|
||||
|
||||
def get_value(self, section, key):
|
||||
"""
|
||||
Get a value from the configuration.
|
||||
|
||||
Args:
|
||||
section (str): The section in the configuration.
|
||||
key (str): The key to retrieve the value for.
|
||||
|
||||
Returns:
|
||||
str: The value associated with the given section and key.
|
||||
"""
|
||||
return self.config.get(section, key)
|
||||
|
||||
def set_value(self, section, key, value):
|
||||
"""
|
||||
Set or update a value in the configuration.
|
||||
|
||||
Args:
|
||||
section (str): The section in the configuration.
|
||||
key (str): The key to set the value for.
|
||||
value (str): The value to set.
|
||||
"""
|
||||
if not self.config.has_section(section):
|
||||
self.config.add_section(section)
|
||||
self.config.set(section, key, value)
|
||||
self._save_config()
|
||||
|
||||
def remove_value(self, section, key):
|
||||
"""
|
||||
Remove a value from the configuration.
|
||||
|
||||
Args:
|
||||
section (str): The section in the configuration.
|
||||
key (str): The key to remove from the configuration.
|
||||
"""
|
||||
if self.config.has_section(section) and self.config.has_option(section, key):
|
||||
self.config.remove_option(section, key)
|
||||
self._save_config()
|
||||
|
||||
def remove_section(self, section):
|
||||
"""
|
||||
Remove a section from the configuration.
|
||||
Remove a section will Remove all configuration items under the section
|
||||
|
||||
Args:
|
||||
section (str): The section to remove from the configuration.
|
||||
"""
|
||||
if self.config.has_section(section):
|
||||
self.config.remove_section(section)
|
||||
self._save_config()
|
@ -1,40 +0,0 @@
|
||||
import logging
|
||||
import os
|
||||
from logging import handlers
|
||||
|
||||
class MyLogging():
|
||||
# init logging
|
||||
def __init__(self):
|
||||
# the file of log
|
||||
logPath = 'logs/'
|
||||
if not os.path.exists(logPath):
|
||||
os.makedirs(logPath)
|
||||
logName = 'app_manage.log'
|
||||
logFile = logPath + logName
|
||||
formatter = logging.Formatter('%(asctime)s %(levelname)s: %(message)s')
|
||||
# handler
|
||||
time_rotating_file_handler = handlers.TimedRotatingFileHandler(filename=logFile, when="MIDNIGHT", interval=1, encoding='utf-8')
|
||||
time_rotating_file_handler.setLevel(logging.DEBUG)
|
||||
time_rotating_file_handler.setFormatter(formatter)
|
||||
# config
|
||||
logging.basicConfig(
|
||||
level= logging.DEBUG,
|
||||
handlers= [time_rotating_file_handler],
|
||||
datefmt='%Y-%m-%d %H:%M:%S',
|
||||
format='%(asctime)s %(levelname)s: %(message)s'
|
||||
)
|
||||
|
||||
def info_logger(self, content):
|
||||
logging.info(content)
|
||||
|
||||
def error_logger(self, content):
|
||||
logging.error(content)
|
||||
|
||||
def debug_logger(self, content):
|
||||
logging.debug(content)
|
||||
|
||||
def warning_logger(self, content):
|
||||
logging.warning(content)
|
||||
|
||||
|
||||
myLogger = MyLogging()
|
93
appmanage_new/app/core/logger.py
Normal file
93
appmanage_new/app/core/logger.py
Normal file
@ -0,0 +1,93 @@
|
||||
"""
|
||||
FileName: logger.py
|
||||
Author: Jing.zhao
|
||||
Created: 2023-08-30
|
||||
Description: This script defines a custom logger that can create and manage two types of logs: 'access' and 'error' for the application.
|
||||
|
||||
Modified by:
|
||||
Modified Date:
|
||||
Modification:
|
||||
"""
|
||||
|
||||
import os
|
||||
import logging
|
||||
from logging.handlers import TimedRotatingFileHandler
|
||||
|
||||
|
||||
class SingletonMeta(type):
|
||||
"""Singleton Metaclass to ensure only one instance of Logger"""
|
||||
_instances = {}
|
||||
|
||||
def __call__(cls, *args, **kwargs):
|
||||
"""Create an instance if not exist, otherwise return the existing instance"""
|
||||
if cls not in cls._instances:
|
||||
instance = super().__call__(*args, **kwargs)
|
||||
cls._instances[cls] = instance
|
||||
return cls._instances[cls]
|
||||
|
||||
|
||||
class Logger(metaclass=SingletonMeta):
|
||||
"""Custom Logger class for creating and managing two types of loggers: 'access' and 'error'
|
||||
|
||||
Usage:
|
||||
from app.core.logger import logger
|
||||
|
||||
# Use the 'access' logger to log info level messages
|
||||
logger.access('This is an info message for access logs')
|
||||
|
||||
# Use the 'error' logger to log error level messages
|
||||
logger.error('This is an error message for error logs')
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
"""Initialize method to create 'access' and 'error' loggers"""
|
||||
self._access_logger = self._configure_logger("access")
|
||||
self._error_logger = self._configure_logger("error")
|
||||
|
||||
def _configure_logger(self, log_type):
|
||||
"""
|
||||
Configure the logger.
|
||||
|
||||
Args:
|
||||
log_type (str): Type of the log, either 'access' or 'error'
|
||||
|
||||
Returns:
|
||||
logger: Configured logger
|
||||
"""
|
||||
logger = logging.getLogger(log_type)
|
||||
logger.setLevel(logging.DEBUG)
|
||||
|
||||
formatter = logging.Formatter(
|
||||
'%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
||||
|
||||
log_folder = os.path.join(os.getcwd(), "logs")
|
||||
os.makedirs(log_folder, exist_ok=True)
|
||||
|
||||
log_file = os.path.join(log_folder, f"{log_type}_{{asctime}}.log")
|
||||
|
||||
file_handler = TimedRotatingFileHandler(
|
||||
filename=log_file,
|
||||
when="D",
|
||||
interval=1,
|
||||
backupCount=30, # Keep logs for 30 days
|
||||
encoding="utf-8"
|
||||
)
|
||||
file_handler.setFormatter(formatter)
|
||||
file_handler.setLevel(logging.DEBUG)
|
||||
|
||||
logger.addHandler(file_handler)
|
||||
return logger
|
||||
|
||||
@property
|
||||
def access(self):
|
||||
"""Property to access 'access' logger"""
|
||||
return self._access_logger.info
|
||||
|
||||
@property
|
||||
def error(self):
|
||||
"""Property to access 'error' logger"""
|
||||
return self._error_logger.error
|
||||
|
||||
|
||||
# Create Logger instance
|
||||
logger = Logger()
|
@ -1,12 +0,0 @@
|
||||
#appstore_preview_update=false
|
||||
#domain=test.websoft9.com
|
||||
|
||||
#email=help@websoft9.com
|
||||
#ip=127.0.0.1
|
||||
#smtp_port=743
|
||||
#smtp_server=smtp.websoft9.com
|
||||
#smtp_tls/ssl=true
|
||||
#smtp_user=admin
|
||||
#smtp_password=password
|
||||
#install_path=/data
|
||||
#artifact_url=https://w9artifact.blob.core.windows.net/release/websoft9
|
0
appmanage_new/app/external/gitea_api.py
vendored
Normal file
0
appmanage_new/app/external/gitea_api.py
vendored
Normal file
@ -1,84 +0,0 @@
|
||||
import requests
|
||||
|
||||
class NginxProxyManagerAPI:
|
||||
"""
|
||||
This class provides methods to interact with the Nginx Proxy Manager API.
|
||||
|
||||
Args:
|
||||
base_url (str): The base URL of the Nginx Proxy Manager API.
|
||||
api_token (str): The API Token to use for authorization.
|
||||
|
||||
Attributes:
|
||||
base_url (str): The base URL of the Nginx Proxy Manager API.
|
||||
api_token (str): The API Token to use for authorization.
|
||||
|
||||
Methods:
|
||||
get_token(identity, scope, secret): Request a new access token from Nginx Proxy Manager
|
||||
refresh_token(): Refresh your access token
|
||||
"""
|
||||
|
||||
def __init__(self, base_url, api_token):
|
||||
"""
|
||||
Initialize the NginxProxyManagerAPI instance.
|
||||
|
||||
Args:
|
||||
base_url (str): The base URL of the Nginx Proxy Manager API.
|
||||
api_token (str): The API token to use for authorization.
|
||||
"""
|
||||
self.base_url = base_url
|
||||
self.api_token = api_token
|
||||
|
||||
def get_token(self,identity,scope,secret):
|
||||
"""
|
||||
Request a new access token from Nginx Proxy Manager
|
||||
|
||||
Args:
|
||||
identity (string): user account with an email address
|
||||
scope (user): "user"
|
||||
secret (string): user password
|
||||
|
||||
Returns:
|
||||
dict or None: A dictionary containing token-related information if successful,otherwise None. The dictionary structure is as follows:
|
||||
If successful:
|
||||
{
|
||||
"expires": str, # Expiry timestamp of the token
|
||||
"token": str # The access token
|
||||
}
|
||||
|
||||
If unsuccessful:
|
||||
None
|
||||
"""
|
||||
url = f"{self.base_url}/api/tokens"
|
||||
data = {
|
||||
"identity": identity,
|
||||
"scope": scope,
|
||||
"secret": secret
|
||||
}
|
||||
response = requests.post(url,json=data, headers=headers)
|
||||
if response.status_code == 200:
|
||||
return response.json()
|
||||
else:
|
||||
return None
|
||||
|
||||
def refresh_token(self):
|
||||
"""
|
||||
Refresh your access token
|
||||
|
||||
Returns:
|
||||
dict or None: A dictionary containing token-related information if successful,otherwise None. The dictionary structure is as follows:
|
||||
If successful:
|
||||
{
|
||||
"expires": str, # Expiry timestamp of the token
|
||||
"token": str # The access token
|
||||
}
|
||||
|
||||
If unsuccessful:
|
||||
None
|
||||
"""
|
||||
url = f"{self.base_url}/api/tokens"
|
||||
headers = {"Authorization": f"Bearer {self.api_token}"}
|
||||
response = requests.get(url, headers=headers)
|
||||
if response.status_code == 200:
|
||||
return response.json()
|
||||
else:
|
||||
return None
|
263
appmanage_new/app/external/nginx_proxy_manager_api.py
vendored
Normal file
263
appmanage_new/app/external/nginx_proxy_manager_api.py
vendored
Normal file
@ -0,0 +1,263 @@
|
||||
|
||||
import requests
|
||||
from typing import List, Union
|
||||
from app.core.config import ConfigManager
|
||||
|
||||
class NginxProxyManagerAPI:
|
||||
"""
|
||||
This class provides methods to interact with the Nginx Proxy Manager API.
|
||||
|
||||
Attributes:
|
||||
base_url (str): The base URL of the Nginx Proxy Manager API.
|
||||
api_token (str): The API Token to use for authorization.
|
||||
|
||||
Methods:
|
||||
get_token(identity: str,secret: str): Request a new access token
|
||||
refresh_token(): Refresh your access token
|
||||
get_proxy_hosts(): Get all proxy hosts
|
||||
create_proxy_host(domain_names: List[str],forward_scheme:str,forward_host: str,forward_port: int ,advanced_config: str): Create a new proxy host
|
||||
update_proxy_host(proxy_id: int,domain_names: List[str],forward_scheme:str,forward_host: str,forward_port: int ,advanced_config: str): Update an existing proxy host
|
||||
delete_proxy_host(proxy_id: int): Delete a proxy host
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
"""
|
||||
Initialize the NginxProxyManagerAPI instance.
|
||||
"""
|
||||
self.base_url = ConfigManager().get_value("nginx_proxy_manager", "base_url")
|
||||
self.api_token = None
|
||||
|
||||
def get_token(self, identity: str, secret: str) -> Union[dict, None]:
|
||||
"""
|
||||
Request a new access token
|
||||
|
||||
Args:
|
||||
identity (string): user account with an email address
|
||||
secret (string): user password
|
||||
|
||||
Returns:
|
||||
dict or None: A dictionary containing token-related information if successful,otherwise None. The dictionary structure is as follows:
|
||||
If successful:
|
||||
{
|
||||
"expires": str, # Expiry timestamp of the token
|
||||
"token": str # The access token
|
||||
}
|
||||
|
||||
If unsuccessful:
|
||||
None
|
||||
"""
|
||||
|
||||
url = f"{self.base_url}/api/tokens"
|
||||
headers = {
|
||||
'Content-Type': 'application/json'
|
||||
}
|
||||
json = {
|
||||
"identity": identity,
|
||||
"scope": "user",
|
||||
"secret": secret
|
||||
}
|
||||
response = requests.post(url, json=json, headers=headers)
|
||||
if response.status_code == 200:
|
||||
return response.json()
|
||||
else:
|
||||
return None
|
||||
|
||||
def refresh_token(self) -> Union[dict, None]:
|
||||
"""
|
||||
Refresh your access token
|
||||
|
||||
Returns:
|
||||
dict or None: A dictionary containing token-related information if successful,otherwise None. The dictionary structure is as follows:
|
||||
If successful:
|
||||
{
|
||||
"expires": str, # Expiry timestamp of the token
|
||||
"token": str # The access token
|
||||
}
|
||||
|
||||
If unsuccessful:
|
||||
None
|
||||
"""
|
||||
url = f"{self.base_url}/api/tokens"
|
||||
headers = {
|
||||
'Content-Type': 'application/json',
|
||||
"Authorization": f"Bearer {self.api_token}"
|
||||
}
|
||||
response = requests.get(url, headers=headers)
|
||||
if response.status_code == 200:
|
||||
return response.json()
|
||||
else:
|
||||
return None
|
||||
|
||||
def get_proxy_hosts(self) -> Union[List[dict], None]:
|
||||
"""
|
||||
Get all proxy hosts
|
||||
|
||||
Returns:
|
||||
list or None: If the retrieval is successful, returns a list of dictionaries containing proxy host information, where each dictionary includes:
|
||||
- "proxy_id": The ID of the proxy host.
|
||||
- "forward_host": The target host name of the proxy.
|
||||
- "domain_names": A list of domain names associated with the proxy host.
|
||||
Returns None if the retrieval fails.
|
||||
"""
|
||||
url = f"{self.base_url}/api/nginx/proxy-hosts"
|
||||
params = {"expand": "owner,access_list,certificate"}
|
||||
headers = {
|
||||
'Content-Type': 'application/json',
|
||||
"Authorization": f"Bearer {self.api_token}"
|
||||
}
|
||||
response = requests.get(url, params=params, headers=headers)
|
||||
if response.status_code == 200:
|
||||
proxy_hosts = response.json()
|
||||
result_dict = [
|
||||
{
|
||||
"proxy_id": proxy["id"],
|
||||
"forward_host": proxy["forward_host"],
|
||||
"domain_names": proxy["domain_names"]
|
||||
}
|
||||
for proxy in proxy_hosts
|
||||
]
|
||||
return result_dict
|
||||
else:
|
||||
return None
|
||||
|
||||
def create_proxy_host(self, domain_names: List[str], forward_scheme: str, forward_host: str, forward_port: int, advanced_config: str) -> Union[dict, None]:
|
||||
"""
|
||||
Create a new proxy host
|
||||
|
||||
Args:
|
||||
domain_names (List[str]): List of domain names associated with the proxy host.
|
||||
forward_scheme (str): The scheme (HTTP or HTTPS) for forwarding traffic.
|
||||
forward_host (str): The target host to which traffic will be forwarded.
|
||||
forward_port (int): The port on the target host to which traffic will be forwarded.
|
||||
advanced_config (str): Advanced configuration options for the proxy host.
|
||||
|
||||
Returns:
|
||||
dict or None: If the proxy host creation is successful,
|
||||
returns a dictionary containing information about the created proxy host with the following fields:
|
||||
- "proxy_id": The id of the created proxy host.
|
||||
- "forward_host": The target host name of the proxy.
|
||||
- "domain_names": A list of domain names associated with the proxy host.
|
||||
Returns None if the proxy host creation fails .
|
||||
"""
|
||||
url = f"{self.base_url}/api/nginx/proxy-hosts"
|
||||
json = {
|
||||
"domain_names": domain_names,
|
||||
"forward_scheme": forward_scheme,
|
||||
"forward_host": forward_host,
|
||||
"forward_port": forward_port,
|
||||
"access_list_id": "0",
|
||||
"certificate_id": 0,
|
||||
"meta": {
|
||||
"letsencrypt_agree": False,
|
||||
"dns_challenge": False
|
||||
},
|
||||
"advanced_config": advanced_config,
|
||||
"block_exploits": False,
|
||||
"caching_enabled": False,
|
||||
"allow_websocket_upgrade": False,
|
||||
"http2_support": False,
|
||||
"hsts_enabled": False,
|
||||
"hsts_subdomains": False,
|
||||
"ssl_forced": False,
|
||||
"locations": [],
|
||||
}
|
||||
headers = {
|
||||
'Content-Type': 'application/json',
|
||||
"Authorization": f"Bearer {self.api_token}"
|
||||
}
|
||||
response = requests.post(url, json=json, headers=headers)
|
||||
if response.status_code == 201:
|
||||
proxy_hosts = response.json()
|
||||
proxy_id = proxy_hosts.get("id")
|
||||
domain_names = proxy_hosts.get("domain_names")
|
||||
forward_host = proxy_hosts.get("forward_host")
|
||||
result_dict = {
|
||||
"proxy_id": proxy_id,
|
||||
"forward_host": forward_host,
|
||||
"domain_names": domain_names
|
||||
}
|
||||
return result_dict
|
||||
else:
|
||||
return None
|
||||
|
||||
def update_proxy_host(self, proxy_id: int, domain_names: List[str], forward_scheme: str, forward_host: str, forward_port: int, advanced_config: str) -> Union[dict, None]:
|
||||
"""
|
||||
Update an existing proxy host.
|
||||
|
||||
Args:
|
||||
proxy_id (int): The ID of the proxy host to be updated.
|
||||
domain_names (List[str]): List of updated domain names associated with the proxy host.
|
||||
forward_scheme (str): The updated scheme (HTTP or HTTPS) for forwarding traffic.
|
||||
forward_host (str): The updated target host to which traffic will be forwarded.
|
||||
forward_port (int): The updated port on the target host to which traffic will be forwarded.
|
||||
advanced_config (str): Updated advanced configuration options for the proxy host.
|
||||
|
||||
Returns:
|
||||
dict or None: If the proxy host update is successful,
|
||||
returns a dictionary containing information about the updated proxy host with the following fields:
|
||||
- "proxy_id": The ID of the updated proxy host.
|
||||
- "forward_host": The target host name of the proxy after the update.
|
||||
- "domain_names": A list of updated domain names associated with the proxy host.
|
||||
Returns None if the proxy host update fails.
|
||||
"""
|
||||
url = f"{self.base_url}/api/nginx/proxy-hosts/{proxy_id}"
|
||||
headers = {
|
||||
'Content-Type': 'application/json',
|
||||
"Authorization": f"Bearer {self.api_token}"
|
||||
}
|
||||
json = {
|
||||
"domain_names": domain_names,
|
||||
"forward_scheme": forward_scheme,
|
||||
"forward_host": forward_host,
|
||||
"forward_port": forward_port,
|
||||
"access_list_id": "0",
|
||||
"certificate_id": 0,
|
||||
"meta": {
|
||||
"letsencrypt_agree": False,
|
||||
"dns_challenge": False
|
||||
},
|
||||
"advanced_config": advanced_config,
|
||||
"block_exploits": False,
|
||||
"caching_enabled": False,
|
||||
"allow_websocket_upgrade": False,
|
||||
"http2_support": False,
|
||||
"hsts_enabled": False,
|
||||
"hsts_subdomains": False,
|
||||
"ssl_forced": False,
|
||||
"locations": [],
|
||||
}
|
||||
response = requests.put(url, json=json, headers=headers)
|
||||
if response.status_code == 200:
|
||||
proxy_hosts = response.json()
|
||||
proxy_id = proxy_hosts.get("id")
|
||||
domain_names = proxy_hosts.get("domain_names")
|
||||
forward_host = proxy_hosts.get("forward_host")
|
||||
result_dict = {
|
||||
"proxy_id": proxy_id,
|
||||
"forward_host": forward_host,
|
||||
"domain_names": domain_names
|
||||
}
|
||||
return result_dict
|
||||
else:
|
||||
return None
|
||||
|
||||
def delete_proxy_host(self, proxy_id: int) -> Union[bool, None]:
|
||||
"""
|
||||
Delete a proxy host
|
||||
|
||||
Args:
|
||||
proxy_id (int): The ID of the proxy host to be deleted.
|
||||
|
||||
Returns:
|
||||
bool or None: Returns the response object if the proxy host is successfully deleted ,
|
||||
indicating a successful deletion. Returns None if the deletion fails .
|
||||
"""
|
||||
url = f"{self.base_url}/api/nginx/proxy-hosts/{proxy_id}"
|
||||
headers = {
|
||||
'Content-Type': 'application/json',
|
||||
"Authorization": f"Bearer {self.api_token}"
|
||||
}
|
||||
response = requests.delete(url, headers=headers)
|
||||
if response.status_code == 200:
|
||||
return response
|
||||
return None
|
0
appmanage_new/app/external/portainer_api.py
vendored
Normal file
0
appmanage_new/app/external/portainer_api.py
vendored
Normal file
@ -1,519 +1,519 @@
|
||||
# 合并applist
|
||||
def conbine_list(installing_list, installed_list):
|
||||
app_list = installing_list + installed_list
|
||||
result_list = []
|
||||
appid_list = []
|
||||
for app in app_list:
|
||||
app_id = app['app_id']
|
||||
if app_id in appid_list:
|
||||
continue
|
||||
else:
|
||||
appid_list.append(app_id)
|
||||
result_list.append(app)
|
||||
return result_list
|
||||
# # 合并applist
|
||||
# def conbine_list(installing_list, installed_list):
|
||||
# app_list = installing_list + installed_list
|
||||
# result_list = []
|
||||
# appid_list = []
|
||||
# for app in app_list:
|
||||
# app_id = app['app_id']
|
||||
# if app_id in appid_list:
|
||||
# continue
|
||||
# else:
|
||||
# appid_list.append(app_id)
|
||||
# result_list.append(app)
|
||||
# return result_list
|
||||
|
||||
# 获取所有app的信息
|
||||
def get_my_app(app_id):
|
||||
installed_list = get_apps_from_compose()
|
||||
installing_list = get_apps_from_queue()
|
||||
# # 获取所有app的信息
|
||||
# def get_my_app(app_id):
|
||||
# installed_list = get_apps_from_compose()
|
||||
# installing_list = get_apps_from_queue()
|
||||
|
||||
app_list = conbine_list(installing_list, installed_list)
|
||||
find = False
|
||||
ret = {}
|
||||
if app_id != None:
|
||||
for app in app_list:
|
||||
if app_id == app['app_id']:
|
||||
ret = app
|
||||
find = True
|
||||
break
|
||||
if not find:
|
||||
raise CommandException(const.ERROR_CLIENT_PARAM_NOTEXIST, "This App doesn't exist!", "")
|
||||
else:
|
||||
ret = app_list
|
||||
myLogger.info_logger("app list result ok")
|
||||
return ret
|
||||
# app_list = conbine_list(installing_list, installed_list)
|
||||
# find = False
|
||||
# ret = {}
|
||||
# if app_id != None:
|
||||
# for app in app_list:
|
||||
# if app_id == app['app_id']:
|
||||
# ret = app
|
||||
# find = True
|
||||
# break
|
||||
# if not find:
|
||||
# raise CommandException(const.ERROR_CLIENT_PARAM_NOTEXIST, "This App doesn't exist!", "")
|
||||
# else:
|
||||
# ret = app_list
|
||||
# myLogger.info_logger("app list result ok")
|
||||
# return ret
|
||||
|
||||
def get_apps_from_compose():
|
||||
myLogger.info_logger("Search all of apps ...")
|
||||
cmd = "docker compose ls -a --format json"
|
||||
output = shell_execute.execute_command_output_all(cmd)
|
||||
output_list = json.loads(output["result"])
|
||||
myLogger.info_logger(len(output_list))
|
||||
ip = "localhost"
|
||||
try:
|
||||
ip_result = shell_execute.execute_command_output_all("cat /data/apps/w9services/w9appmanage/public_ip")
|
||||
ip = ip_result["result"].rstrip('\n')
|
||||
except Exception:
|
||||
ip = "127.0.0.1"
|
||||
# def get_apps_from_compose():
|
||||
# myLogger.info_logger("Search all of apps ...")
|
||||
# cmd = "docker compose ls -a --format json"
|
||||
# output = shell_execute.execute_command_output_all(cmd)
|
||||
# output_list = json.loads(output["result"])
|
||||
# myLogger.info_logger(len(output_list))
|
||||
# ip = "localhost"
|
||||
# try:
|
||||
# ip_result = shell_execute.execute_command_output_all("cat /data/apps/w9services/w9appmanage/public_ip")
|
||||
# ip = ip_result["result"].rstrip('\n')
|
||||
# except Exception:
|
||||
# ip = "127.0.0.1"
|
||||
|
||||
app_list = []
|
||||
for app_info in output_list:
|
||||
volume = app_info["ConfigFiles"]
|
||||
app_path = volume.rsplit('/', 1)[0]
|
||||
customer_name = volume.split('/')[-2]
|
||||
app_id = ""
|
||||
app_name = ""
|
||||
trade_mark = ""
|
||||
port = 0
|
||||
url = ""
|
||||
admin_url = ""
|
||||
image_url = ""
|
||||
user_name = ""
|
||||
password = ""
|
||||
official_app = False
|
||||
app_version = ""
|
||||
create_time = ""
|
||||
volume_data = ""
|
||||
config_path = app_path
|
||||
app_https = False
|
||||
app_replace_url = False
|
||||
default_domain = ""
|
||||
admin_path = ""
|
||||
admin_domain_url = ""
|
||||
if customer_name in ['w9appmanage', 'w9nginxproxymanager', 'w9redis', 'w9kopia',
|
||||
'w9portainer'] or app_path == '/data/apps/w9services/' + customer_name:
|
||||
continue
|
||||
# app_list = []
|
||||
# for app_info in output_list:
|
||||
# volume = app_info["ConfigFiles"]
|
||||
# app_path = volume.rsplit('/', 1)[0]
|
||||
# customer_name = volume.split('/')[-2]
|
||||
# app_id = ""
|
||||
# app_name = ""
|
||||
# trade_mark = ""
|
||||
# port = 0
|
||||
# url = ""
|
||||
# admin_url = ""
|
||||
# image_url = ""
|
||||
# user_name = ""
|
||||
# password = ""
|
||||
# official_app = False
|
||||
# app_version = ""
|
||||
# create_time = ""
|
||||
# volume_data = ""
|
||||
# config_path = app_path
|
||||
# app_https = False
|
||||
# app_replace_url = False
|
||||
# default_domain = ""
|
||||
# admin_path = ""
|
||||
# admin_domain_url = ""
|
||||
# if customer_name in ['w9appmanage', 'w9nginxproxymanager', 'w9redis', 'w9kopia',
|
||||
# 'w9portainer'] or app_path == '/data/apps/w9services/' + customer_name:
|
||||
# continue
|
||||
|
||||
var_path = app_path + "/variables.json"
|
||||
official_app = check_if_official_app(var_path)
|
||||
# var_path = app_path + "/variables.json"
|
||||
# official_app = check_if_official_app(var_path)
|
||||
|
||||
status_show = app_info["Status"]
|
||||
status = app_info["Status"].split("(")[0]
|
||||
if status == "running" or status == "exited" or status == "restarting":
|
||||
if "exited" in status_show and "running" in status_show:
|
||||
if status == "exited":
|
||||
cmd = "docker ps -a -f name=" + customer_name + " --format {{.Names}}#{{.Status}}|grep Exited"
|
||||
result = shell_execute.execute_command_output_all(cmd)["result"].rstrip('\n')
|
||||
container = result.split("#Exited")[0]
|
||||
if container != customer_name:
|
||||
status = "running"
|
||||
if "restarting" in status_show:
|
||||
about_time = get_createtime(official_app, app_path, customer_name)
|
||||
if "seconds" in about_time:
|
||||
status = "restarting"
|
||||
else:
|
||||
status = "failed"
|
||||
elif status == "created":
|
||||
status = "failed"
|
||||
else:
|
||||
continue
|
||||
# status_show = app_info["Status"]
|
||||
# status = app_info["Status"].split("(")[0]
|
||||
# if status == "running" or status == "exited" or status == "restarting":
|
||||
# if "exited" in status_show and "running" in status_show:
|
||||
# if status == "exited":
|
||||
# cmd = "docker ps -a -f name=" + customer_name + " --format {{.Names}}#{{.Status}}|grep Exited"
|
||||
# result = shell_execute.execute_command_output_all(cmd)["result"].rstrip('\n')
|
||||
# container = result.split("#Exited")[0]
|
||||
# if container != customer_name:
|
||||
# status = "running"
|
||||
# if "restarting" in status_show:
|
||||
# about_time = get_createtime(official_app, app_path, customer_name)
|
||||
# if "seconds" in about_time:
|
||||
# status = "restarting"
|
||||
# else:
|
||||
# status = "failed"
|
||||
# elif status == "created":
|
||||
# status = "failed"
|
||||
# else:
|
||||
# continue
|
||||
|
||||
if official_app:
|
||||
app_name = docker.read_var(var_path, 'name')
|
||||
app_id = app_name + "_" + customer_name # app_id
|
||||
# get trade_mark
|
||||
trade_mark = docker.read_var(var_path, 'trademark')
|
||||
image_url = get_Image_url(app_name)
|
||||
# get env info
|
||||
path = app_path + "/.env"
|
||||
env_map = docker.get_map(path)
|
||||
# if official_app:
|
||||
# app_name = docker.read_var(var_path, 'name')
|
||||
# app_id = app_name + "_" + customer_name # app_id
|
||||
# # get trade_mark
|
||||
# trade_mark = docker.read_var(var_path, 'trademark')
|
||||
# image_url = get_Image_url(app_name)
|
||||
# # get env info
|
||||
# path = app_path + "/.env"
|
||||
# env_map = docker.get_map(path)
|
||||
|
||||
try:
|
||||
myLogger.info_logger("get domain for APP_URL")
|
||||
domain = env_map.get("APP_URL")
|
||||
if "appname.example.com" in domain or ip in domain:
|
||||
default_domain = ""
|
||||
else:
|
||||
default_domain = domain
|
||||
except Exception:
|
||||
myLogger.info_logger("domain exception")
|
||||
try:
|
||||
app_version = env_map.get("APP_VERSION")
|
||||
volume_data = "/data/apps/" + customer_name + "/data"
|
||||
user_name = env_map.get("APP_USER", "")
|
||||
password = env_map.get("POWER_PASSWORD", "")
|
||||
admin_path = env_map.get("APP_ADMIN_PATH")
|
||||
if admin_path:
|
||||
myLogger.info_logger(admin_path)
|
||||
admin_path = admin_path.replace("\"", "")
|
||||
else:
|
||||
admin_path = ""
|
||||
# try:
|
||||
# myLogger.info_logger("get domain for APP_URL")
|
||||
# domain = env_map.get("APP_URL")
|
||||
# if "appname.example.com" in domain or ip in domain:
|
||||
# default_domain = ""
|
||||
# else:
|
||||
# default_domain = domain
|
||||
# except Exception:
|
||||
# myLogger.info_logger("domain exception")
|
||||
# try:
|
||||
# app_version = env_map.get("APP_VERSION")
|
||||
# volume_data = "/data/apps/" + customer_name + "/data"
|
||||
# user_name = env_map.get("APP_USER", "")
|
||||
# password = env_map.get("POWER_PASSWORD", "")
|
||||
# admin_path = env_map.get("APP_ADMIN_PATH")
|
||||
# if admin_path:
|
||||
# myLogger.info_logger(admin_path)
|
||||
# admin_path = admin_path.replace("\"", "")
|
||||
# else:
|
||||
# admin_path = ""
|
||||
|
||||
if default_domain != "" and admin_path != "":
|
||||
admin_domain_url = "http://" + default_domain + admin_path
|
||||
except Exception:
|
||||
myLogger.info_logger("APP_USER POWER_PASSWORD exception")
|
||||
try:
|
||||
replace = env_map.get("APP_URL_REPLACE", "false")
|
||||
myLogger.info_logger("replace=" + replace)
|
||||
if replace == "true":
|
||||
app_replace_url = True
|
||||
https = env_map.get("APP_HTTPS_ACCESS", "false")
|
||||
if https == "true":
|
||||
app_https = True
|
||||
except Exception:
|
||||
myLogger.info_logger("APP_HTTPS_ACCESS exception")
|
||||
# if default_domain != "" and admin_path != "":
|
||||
# admin_domain_url = "http://" + default_domain + admin_path
|
||||
# except Exception:
|
||||
# myLogger.info_logger("APP_USER POWER_PASSWORD exception")
|
||||
# try:
|
||||
# replace = env_map.get("APP_URL_REPLACE", "false")
|
||||
# myLogger.info_logger("replace=" + replace)
|
||||
# if replace == "true":
|
||||
# app_replace_url = True
|
||||
# https = env_map.get("APP_HTTPS_ACCESS", "false")
|
||||
# if https == "true":
|
||||
# app_https = True
|
||||
# except Exception:
|
||||
# myLogger.info_logger("APP_HTTPS_ACCESS exception")
|
||||
|
||||
try:
|
||||
http_port = env_map.get("APP_HTTP_PORT", "0")
|
||||
if http_port:
|
||||
port = int(http_port)
|
||||
except Exception:
|
||||
pass
|
||||
if port != 0:
|
||||
try:
|
||||
if app_https:
|
||||
easy_url = "https://" + ip + ":" + str(port)
|
||||
else:
|
||||
easy_url = "http://" + ip + ":" + str(port)
|
||||
url = easy_url
|
||||
admin_url = get_admin_url(customer_name, url)
|
||||
except Exception:
|
||||
pass
|
||||
else:
|
||||
try:
|
||||
db_port = list(docker.read_env(path, "APP_DB.*_PORT").values())[0]
|
||||
port = int(db_port)
|
||||
except Exception:
|
||||
pass
|
||||
else:
|
||||
app_name = customer_name
|
||||
app_id = customer_name + "_" + customer_name
|
||||
create_time = get_createtime(official_app, app_path, customer_name)
|
||||
if status in ['running', 'exited']:
|
||||
config = Config(port=port, compose_file=volume, url=url, admin_url=admin_url,
|
||||
admin_domain_url=admin_domain_url,
|
||||
admin_path=admin_path, admin_username=user_name, admin_password=password,
|
||||
default_domain=default_domain)
|
||||
else:
|
||||
config = None
|
||||
if status == "failed":
|
||||
status_reason = StatusReason(Code=const.ERROR_SERVER_SYSTEM, Message="system original error",
|
||||
Detail="unknown error")
|
||||
else:
|
||||
status_reason = None
|
||||
app = App(app_id=app_id, app_name=app_name, customer_name=customer_name, trade_mark=trade_mark,
|
||||
app_version=app_version, create_time=create_time, volume_data=volume_data, config_path=config_path,
|
||||
status=status, status_reason=status_reason, official_app=official_app, image_url=image_url,
|
||||
app_https=app_https, app_replace_url=app_replace_url, config=config)
|
||||
# try:
|
||||
# http_port = env_map.get("APP_HTTP_PORT", "0")
|
||||
# if http_port:
|
||||
# port = int(http_port)
|
||||
# except Exception:
|
||||
# pass
|
||||
# if port != 0:
|
||||
# try:
|
||||
# if app_https:
|
||||
# easy_url = "https://" + ip + ":" + str(port)
|
||||
# else:
|
||||
# easy_url = "http://" + ip + ":" + str(port)
|
||||
# url = easy_url
|
||||
# admin_url = get_admin_url(customer_name, url)
|
||||
# except Exception:
|
||||
# pass
|
||||
# else:
|
||||
# try:
|
||||
# db_port = list(docker.read_env(path, "APP_DB.*_PORT").values())[0]
|
||||
# port = int(db_port)
|
||||
# except Exception:
|
||||
# pass
|
||||
# else:
|
||||
# app_name = customer_name
|
||||
# app_id = customer_name + "_" + customer_name
|
||||
# create_time = get_createtime(official_app, app_path, customer_name)
|
||||
# if status in ['running', 'exited']:
|
||||
# config = Config(port=port, compose_file=volume, url=url, admin_url=admin_url,
|
||||
# admin_domain_url=admin_domain_url,
|
||||
# admin_path=admin_path, admin_username=user_name, admin_password=password,
|
||||
# default_domain=default_domain)
|
||||
# else:
|
||||
# config = None
|
||||
# if status == "failed":
|
||||
# status_reason = StatusReason(Code=const.ERROR_SERVER_SYSTEM, Message="system original error",
|
||||
# Detail="unknown error")
|
||||
# else:
|
||||
# status_reason = None
|
||||
# app = App(app_id=app_id, app_name=app_name, customer_name=customer_name, trade_mark=trade_mark,
|
||||
# app_version=app_version, create_time=create_time, volume_data=volume_data, config_path=config_path,
|
||||
# status=status, status_reason=status_reason, official_app=official_app, image_url=image_url,
|
||||
# app_https=app_https, app_replace_url=app_replace_url, config=config)
|
||||
|
||||
app_list.append(app.dict())
|
||||
return app_list
|
||||
# app_list.append(app.dict())
|
||||
# return app_list
|
||||
|
||||
# 安装
|
||||
def install_app(app_name, customer_name, app_version):
|
||||
myLogger.info_logger("Install app ...")
|
||||
ret = {}
|
||||
ret['ResponseData'] = {}
|
||||
app_id = app_name + "_" + customer_name
|
||||
ret['ResponseData']['app_id'] = app_id
|
||||
# # 安装
|
||||
# def install_app(app_name, customer_name, app_version):
|
||||
# myLogger.info_logger("Install app ...")
|
||||
# ret = {}
|
||||
# ret['ResponseData'] = {}
|
||||
# app_id = app_name + "_" + customer_name
|
||||
# ret['ResponseData']['app_id'] = app_id
|
||||
|
||||
code, message = check_app(app_name, customer_name, app_version)
|
||||
if code == None:
|
||||
q.enqueue(install_app_delay, app_name, customer_name, app_version, job_id=app_id)
|
||||
else:
|
||||
ret['Error'] = get_error_info(code, message, "")
|
||||
# code, message = check_app(app_name, customer_name, app_version)
|
||||
# if code == None:
|
||||
# q.enqueue(install_app_delay, app_name, customer_name, app_version, job_id=app_id)
|
||||
# else:
|
||||
# ret['Error'] = get_error_info(code, message, "")
|
||||
|
||||
return ret
|
||||
# return ret
|
||||
|
||||
def start_app(app_id):
|
||||
info, flag = app_exits_in_docker(app_id)
|
||||
if flag:
|
||||
app_path = info.split()[-1].rsplit('/', 1)[0]
|
||||
cmd = "docker compose -f " + app_path + "/docker-compose.yml start"
|
||||
shell_execute.execute_command_output_all(cmd)
|
||||
else:
|
||||
raise CommandException(const.ERROR_CLIENT_PARAM_NOTEXIST, "APP is not exist", "")
|
||||
# def start_app(app_id):
|
||||
# info, flag = app_exits_in_docker(app_id)
|
||||
# if flag:
|
||||
# app_path = info.split()[-1].rsplit('/', 1)[0]
|
||||
# cmd = "docker compose -f " + app_path + "/docker-compose.yml start"
|
||||
# shell_execute.execute_command_output_all(cmd)
|
||||
# else:
|
||||
# raise CommandException(const.ERROR_CLIENT_PARAM_NOTEXIST, "APP is not exist", "")
|
||||
|
||||
|
||||
def stop_app(app_id):
|
||||
info, flag = app_exits_in_docker(app_id)
|
||||
if flag:
|
||||
app_path = info.split()[-1].rsplit('/', 1)[0]
|
||||
cmd = "docker compose -f " + app_path + "/docker-compose.yml stop"
|
||||
shell_execute.execute_command_output_all(cmd)
|
||||
else:
|
||||
raise CommandException(const.ERROR_CLIENT_PARAM_NOTEXIST, "APP is not exist", "")
|
||||
# def stop_app(app_id):
|
||||
# info, flag = app_exits_in_docker(app_id)
|
||||
# if flag:
|
||||
# app_path = info.split()[-1].rsplit('/', 1)[0]
|
||||
# cmd = "docker compose -f " + app_path + "/docker-compose.yml stop"
|
||||
# shell_execute.execute_command_output_all(cmd)
|
||||
# else:
|
||||
# raise CommandException(const.ERROR_CLIENT_PARAM_NOTEXIST, "APP is not exist", "")
|
||||
|
||||
|
||||
def restart_app(app_id):
|
||||
code, message = docker.check_app_id(app_id)
|
||||
if code == None:
|
||||
info, flag = app_exits_in_docker(app_id)
|
||||
if flag:
|
||||
app_path = info.split()[-1].rsplit('/', 1)[0]
|
||||
cmd = "docker compose -f " + app_path + "/docker-compose.yml restart"
|
||||
shell_execute.execute_command_output_all(cmd)
|
||||
else:
|
||||
raise CommandException(const.ERROR_CLIENT_PARAM_NOTEXIST, "APP is not exist", "")
|
||||
else:
|
||||
raise CommandException(code, message, "")
|
||||
# def restart_app(app_id):
|
||||
# code, message = docker.check_app_id(app_id)
|
||||
# if code == None:
|
||||
# info, flag = app_exits_in_docker(app_id)
|
||||
# if flag:
|
||||
# app_path = info.split()[-1].rsplit('/', 1)[0]
|
||||
# cmd = "docker compose -f " + app_path + "/docker-compose.yml restart"
|
||||
# shell_execute.execute_command_output_all(cmd)
|
||||
# else:
|
||||
# raise CommandException(const.ERROR_CLIENT_PARAM_NOTEXIST, "APP is not exist", "")
|
||||
# else:
|
||||
# raise CommandException(code, message, "")
|
||||
|
||||
def uninstall_app(app_id):
|
||||
app_name = app_id.split('_')[0]
|
||||
customer_name = app_id.split('_')[1]
|
||||
app_path = ""
|
||||
info, code_exist = app_exits_in_docker(app_id)
|
||||
if code_exist:
|
||||
app_path = info.split()[-1].rsplit('/', 1)[0]
|
||||
cmd = "docker compose -f " + app_path + "/docker-compose.yml down -v"
|
||||
lib_path = '/data/library/apps/' + app_name
|
||||
if app_path != lib_path:
|
||||
cmd = cmd + " && sudo rm -rf " + app_path
|
||||
shell_execute.execute_command_output_all(cmd)
|
||||
else:
|
||||
if check_app_rq(app_id):
|
||||
delete_app_failedjob(app_id)
|
||||
else:
|
||||
raise CommandException(const.ERROR_CLIENT_PARAM_NOTEXIST, "AppID is not exist", "")
|
||||
# Force to delete docker compose
|
||||
try:
|
||||
cmd = " sudo rm -rf /data/apps/" + customer_name
|
||||
shell_execute.execute_command_output_all(cmd)
|
||||
except CommandException as ce:
|
||||
myLogger.info_logger("Delete app compose exception")
|
||||
# Delete proxy config when uninstall app
|
||||
app_proxy_delete(app_id)
|
||||
# def uninstall_app(app_id):
|
||||
# app_name = app_id.split('_')[0]
|
||||
# customer_name = app_id.split('_')[1]
|
||||
# app_path = ""
|
||||
# info, code_exist = app_exits_in_docker(app_id)
|
||||
# if code_exist:
|
||||
# app_path = info.split()[-1].rsplit('/', 1)[0]
|
||||
# cmd = "docker compose -f " + app_path + "/docker-compose.yml down -v"
|
||||
# lib_path = '/data/library/apps/' + app_name
|
||||
# if app_path != lib_path:
|
||||
# cmd = cmd + " && sudo rm -rf " + app_path
|
||||
# shell_execute.execute_command_output_all(cmd)
|
||||
# else:
|
||||
# if check_app_rq(app_id):
|
||||
# delete_app_failedjob(app_id)
|
||||
# else:
|
||||
# raise CommandException(const.ERROR_CLIENT_PARAM_NOTEXIST, "AppID is not exist", "")
|
||||
# # Force to delete docker compose
|
||||
# try:
|
||||
# cmd = " sudo rm -rf /data/apps/" + customer_name
|
||||
# shell_execute.execute_command_output_all(cmd)
|
||||
# except CommandException as ce:
|
||||
# myLogger.info_logger("Delete app compose exception")
|
||||
# # Delete proxy config when uninstall app
|
||||
# app_proxy_delete(app_id)
|
||||
|
||||
|
||||
# 安装失败后的处理
|
||||
def delete_app(app_id):
|
||||
try:
|
||||
app_name = app_id.split('_')[0]
|
||||
customer_name = app_id.split('_')[1]
|
||||
app_path = ""
|
||||
info, code_exist = app_exits_in_docker(app_id)
|
||||
if code_exist:
|
||||
app_path = info.split()[-1].rsplit('/', 1)[0]
|
||||
cmd = "docker compose -f " + app_path + "/docker-compose.yml down -v"
|
||||
lib_path = '/data/library/apps/' + app_name
|
||||
if app_path != lib_path:
|
||||
cmd = cmd + " && sudo rm -rf " + app_path
|
||||
try:
|
||||
myLogger.info_logger("Intall fail, down app and delete files")
|
||||
shell_execute.execute_command_output_all(cmd)
|
||||
except Exception:
|
||||
myLogger.info_logger("Delete app compose exception")
|
||||
# 强制删除失败又无法通过docker compose down 删除的容器
|
||||
try:
|
||||
myLogger.info_logger("IF delete fail, force to delete containers")
|
||||
force_cmd = "docker rm -f $(docker ps -f name=^" + customer_name + " -aq)"
|
||||
shell_execute.execute_command_output_all(force_cmd)
|
||||
except Exception:
|
||||
myLogger.info_logger("force delete app compose exception")
|
||||
# # 安装失败后的处理
|
||||
# def delete_app(app_id):
|
||||
# try:
|
||||
# app_name = app_id.split('_')[0]
|
||||
# customer_name = app_id.split('_')[1]
|
||||
# app_path = ""
|
||||
# info, code_exist = app_exits_in_docker(app_id)
|
||||
# if code_exist:
|
||||
# app_path = info.split()[-1].rsplit('/', 1)[0]
|
||||
# cmd = "docker compose -f " + app_path + "/docker-compose.yml down -v"
|
||||
# lib_path = '/data/library/apps/' + app_name
|
||||
# if app_path != lib_path:
|
||||
# cmd = cmd + " && sudo rm -rf " + app_path
|
||||
# try:
|
||||
# myLogger.info_logger("Intall fail, down app and delete files")
|
||||
# shell_execute.execute_command_output_all(cmd)
|
||||
# except Exception:
|
||||
# myLogger.info_logger("Delete app compose exception")
|
||||
# # 强制删除失败又无法通过docker compose down 删除的容器
|
||||
# try:
|
||||
# myLogger.info_logger("IF delete fail, force to delete containers")
|
||||
# force_cmd = "docker rm -f $(docker ps -f name=^" + customer_name + " -aq)"
|
||||
# shell_execute.execute_command_output_all(force_cmd)
|
||||
# except Exception:
|
||||
# myLogger.info_logger("force delete app compose exception")
|
||||
|
||||
else:
|
||||
if check_app_rq(app_id):
|
||||
delete_app_failedjob(app_id)
|
||||
else:
|
||||
raise CommandException(const.ERROR_CLIENT_PARAM_NOTEXIST, "AppID is not exist", "")
|
||||
cmd = " sudo rm -rf /data/apps/" + customer_name
|
||||
shell_execute.execute_command_output_all(cmd)
|
||||
except CommandException as ce:
|
||||
myLogger.info_logger("Delete app compose exception")
|
||||
# else:
|
||||
# if check_app_rq(app_id):
|
||||
# delete_app_failedjob(app_id)
|
||||
# else:
|
||||
# raise CommandException(const.ERROR_CLIENT_PARAM_NOTEXIST, "AppID is not exist", "")
|
||||
# cmd = " sudo rm -rf /data/apps/" + customer_name
|
||||
# shell_execute.execute_command_output_all(cmd)
|
||||
# except CommandException as ce:
|
||||
# myLogger.info_logger("Delete app compose exception")
|
||||
|
||||
#安装准备
|
||||
def prepare_app(app_name, customer_name):
|
||||
library_path = "/data/library/apps/" + app_name
|
||||
install_path = "/data/apps/" + customer_name
|
||||
shell_execute.execute_command_output_all("cp -r " + library_path + " " + install_path)
|
||||
# #安装准备
|
||||
# def prepare_app(app_name, customer_name):
|
||||
# library_path = "/data/library/apps/" + app_name
|
||||
# install_path = "/data/apps/" + customer_name
|
||||
# shell_execute.execute_command_output_all("cp -r " + library_path + " " + install_path)
|
||||
|
||||
|
||||
def install_app_delay(app_name, customer_name, app_version):
|
||||
myLogger.info_logger("-------RQ install start --------")
|
||||
job_id = app_name + "_" + customer_name
|
||||
# def install_app_delay(app_name, customer_name, app_version):
|
||||
# myLogger.info_logger("-------RQ install start --------")
|
||||
# job_id = app_name + "_" + customer_name
|
||||
|
||||
try:
|
||||
# 因为这个时候还没有复制文件夹,是从/data/library里面文件读取json来检查的,应该是app_name,而不是customer_name
|
||||
resource_flag = docker.check_vm_resource(app_name)
|
||||
# try:
|
||||
# # 因为这个时候还没有复制文件夹,是从/data/library里面文件读取json来检查的,应该是app_name,而不是customer_name
|
||||
# resource_flag = docker.check_vm_resource(app_name)
|
||||
|
||||
if resource_flag == True:
|
||||
# if resource_flag == True:
|
||||
|
||||
myLogger.info_logger("job check ok, continue to install app")
|
||||
env_path = "/data/apps/" + customer_name + "/.env"
|
||||
# prepare_app(app_name, customer_name)
|
||||
docker.check_app_compose(app_name, customer_name)
|
||||
myLogger.info_logger("start JobID=" + job_id)
|
||||
docker.modify_env(env_path, 'APP_NAME', customer_name)
|
||||
docker.modify_env(env_path, "APP_VERSION", app_version)
|
||||
docker.check_app_url(customer_name)
|
||||
cmd = "cd /data/apps/" + customer_name + " && sudo docker compose pull && sudo docker compose up -d"
|
||||
output = shell_execute.execute_command_output_all(cmd)
|
||||
myLogger.info_logger("-------Install result--------")
|
||||
myLogger.info_logger(output["code"])
|
||||
myLogger.info_logger(output["result"])
|
||||
try:
|
||||
shell_execute.execute_command_output_all("bash /data/apps/" + customer_name + "/src/after_up.sh")
|
||||
except Exception as e:
|
||||
myLogger.info_logger(str(e))
|
||||
else:
|
||||
error_info = "##websoft9##" + const.ERROR_SERVER_RESOURCE + "##websoft9##" + "Insufficient system resources (cpu, memory, disk space)" + "##websoft9##" + "Insufficient system resources (cpu, memory, disk space)"
|
||||
myLogger.info_logger(error_info)
|
||||
raise Exception(error_info)
|
||||
except CommandException as ce:
|
||||
myLogger.info_logger(customer_name + " install failed(docker)!")
|
||||
delete_app(job_id)
|
||||
error_info = "##websoft9##" + ce.code + "##websoft9##" + ce.message + "##websoft9##" + ce.detail
|
||||
myLogger.info_logger(error_info)
|
||||
raise Exception(error_info)
|
||||
except Exception as e:
|
||||
myLogger.info_logger(customer_name + " install failed(system)!")
|
||||
delete_app(job_id)
|
||||
error_info = "##websoft9##" + const.ERROR_SERVER_SYSTEM + "##websoft9##" + 'system original error' + "##websoft9##" + str(
|
||||
e)
|
||||
myLogger.info_logger(error_info)
|
||||
raise Exception(error_info)
|
||||
# myLogger.info_logger("job check ok, continue to install app")
|
||||
# env_path = "/data/apps/" + customer_name + "/.env"
|
||||
# # prepare_app(app_name, customer_name)
|
||||
# docker.check_app_compose(app_name, customer_name)
|
||||
# myLogger.info_logger("start JobID=" + job_id)
|
||||
# docker.modify_env(env_path, 'APP_NAME', customer_name)
|
||||
# docker.modify_env(env_path, "APP_VERSION", app_version)
|
||||
# docker.check_app_url(customer_name)
|
||||
# cmd = "cd /data/apps/" + customer_name + " && sudo docker compose pull && sudo docker compose up -d"
|
||||
# output = shell_execute.execute_command_output_all(cmd)
|
||||
# myLogger.info_logger("-------Install result--------")
|
||||
# myLogger.info_logger(output["code"])
|
||||
# myLogger.info_logger(output["result"])
|
||||
# try:
|
||||
# shell_execute.execute_command_output_all("bash /data/apps/" + customer_name + "/src/after_up.sh")
|
||||
# except Exception as e:
|
||||
# myLogger.info_logger(str(e))
|
||||
# else:
|
||||
# error_info = "##websoft9##" + const.ERROR_SERVER_RESOURCE + "##websoft9##" + "Insufficient system resources (cpu, memory, disk space)" + "##websoft9##" + "Insufficient system resources (cpu, memory, disk space)"
|
||||
# myLogger.info_logger(error_info)
|
||||
# raise Exception(error_info)
|
||||
# except CommandException as ce:
|
||||
# myLogger.info_logger(customer_name + " install failed(docker)!")
|
||||
# delete_app(job_id)
|
||||
# error_info = "##websoft9##" + ce.code + "##websoft9##" + ce.message + "##websoft9##" + ce.detail
|
||||
# myLogger.info_logger(error_info)
|
||||
# raise Exception(error_info)
|
||||
# except Exception as e:
|
||||
# myLogger.info_logger(customer_name + " install failed(system)!")
|
||||
# delete_app(job_id)
|
||||
# error_info = "##websoft9##" + const.ERROR_SERVER_SYSTEM + "##websoft9##" + 'system original error' + "##websoft9##" + str(
|
||||
# e)
|
||||
# myLogger.info_logger(error_info)
|
||||
# raise Exception(error_info)
|
||||
|
||||
def get_createtime(official_app, app_path, customer_name):
|
||||
data_time = ""
|
||||
try:
|
||||
if official_app:
|
||||
cmd = "docker ps -f name=" + customer_name + " --format {{.RunningFor}} | head -n 1"
|
||||
result = shell_execute.execute_command_output_all(cmd)["result"].rstrip('\n')
|
||||
data_time = result
|
||||
else:
|
||||
cmd_all = "cd " + app_path + " && docker compose ps -a --format json"
|
||||
output = shell_execute.execute_command_output_all(cmd_all)
|
||||
container_name = json.loads(output["result"])[0]["Name"]
|
||||
cmd = "docker ps -f name=" + container_name + " --format {{.RunningFor}} | head -n 1"
|
||||
result = shell_execute.execute_command_output_all(cmd)["result"].rstrip('\n')
|
||||
data_time = result
|
||||
# def get_createtime(official_app, app_path, customer_name):
|
||||
# data_time = ""
|
||||
# try:
|
||||
# if official_app:
|
||||
# cmd = "docker ps -f name=" + customer_name + " --format {{.RunningFor}} | head -n 1"
|
||||
# result = shell_execute.execute_command_output_all(cmd)["result"].rstrip('\n')
|
||||
# data_time = result
|
||||
# else:
|
||||
# cmd_all = "cd " + app_path + " && docker compose ps -a --format json"
|
||||
# output = shell_execute.execute_command_output_all(cmd_all)
|
||||
# container_name = json.loads(output["result"])[0]["Name"]
|
||||
# cmd = "docker ps -f name=" + container_name + " --format {{.RunningFor}} | head -n 1"
|
||||
# result = shell_execute.execute_command_output_all(cmd)["result"].rstrip('\n')
|
||||
# data_time = result
|
||||
|
||||
except Exception as e:
|
||||
myLogger.info_logger(str(e))
|
||||
myLogger.info_logger("get_createtime get success" + data_time)
|
||||
return data_time
|
||||
# except Exception as e:
|
||||
# myLogger.info_logger(str(e))
|
||||
# myLogger.info_logger("get_createtime get success" + data_time)
|
||||
# return data_time
|
||||
|
||||
def check_if_official_app(var_path):
|
||||
if docker.check_directory(var_path):
|
||||
if docker.read_var(var_path, 'name') != "" and docker.read_var(var_path, 'trademark') != "" and docker.read_var(
|
||||
var_path, 'requirements') != "":
|
||||
requirements = docker.read_var(var_path, 'requirements')
|
||||
try:
|
||||
cpu = requirements['cpu']
|
||||
mem = requirements['memory']
|
||||
disk = requirements['disk']
|
||||
return True
|
||||
except KeyError:
|
||||
return False
|
||||
else:
|
||||
return False
|
||||
# def check_if_official_app(var_path):
|
||||
# if docker.check_directory(var_path):
|
||||
# if docker.read_var(var_path, 'name') != "" and docker.read_var(var_path, 'trademark') != "" and docker.read_var(
|
||||
# var_path, 'requirements') != "":
|
||||
# requirements = docker.read_var(var_path, 'requirements')
|
||||
# try:
|
||||
# cpu = requirements['cpu']
|
||||
# mem = requirements['memory']
|
||||
# disk = requirements['disk']
|
||||
# return True
|
||||
# except KeyError:
|
||||
# return False
|
||||
# else:
|
||||
# return False
|
||||
|
||||
# 应用是否已经安装
|
||||
def check_app_docker(app_id):
|
||||
customer_name = app_id.split('_')[1]
|
||||
app_name = app_id.split('_')[0]
|
||||
flag = False
|
||||
cmd = "docker compose ls -a | grep \'/" + customer_name + "/\'"
|
||||
try:
|
||||
shell_execute.execute_command_output_all(cmd)
|
||||
flag = True
|
||||
myLogger.info_logger("APP in docker")
|
||||
except CommandException as ce:
|
||||
myLogger.info_logger("APP not in docker")
|
||||
# # 应用是否已经安装
|
||||
# def check_app_docker(app_id):
|
||||
# customer_name = app_id.split('_')[1]
|
||||
# app_name = app_id.split('_')[0]
|
||||
# flag = False
|
||||
# cmd = "docker compose ls -a | grep \'/" + customer_name + "/\'"
|
||||
# try:
|
||||
# shell_execute.execute_command_output_all(cmd)
|
||||
# flag = True
|
||||
# myLogger.info_logger("APP in docker")
|
||||
# except CommandException as ce:
|
||||
# myLogger.info_logger("APP not in docker")
|
||||
|
||||
return flag
|
||||
# return flag
|
||||
|
||||
|
||||
def check_app_rq(app_id):
|
||||
myLogger.info_logger("check_app_rq")
|
||||
# def check_app_rq(app_id):
|
||||
# myLogger.info_logger("check_app_rq")
|
||||
|
||||
started = StartedJobRegistry(queue=q)
|
||||
failed = FailedJobRegistry(queue=q)
|
||||
run_job_ids = started.get_job_ids()
|
||||
failed_job_ids = failed.get_job_ids()
|
||||
queue_job_ids = q.job_ids
|
||||
myLogger.info_logger(queue_job_ids)
|
||||
myLogger.info_logger(run_job_ids)
|
||||
myLogger.info_logger(failed_job_ids)
|
||||
if queue_job_ids and app_id in queue_job_ids:
|
||||
myLogger.info_logger("App in RQ")
|
||||
return True
|
||||
if failed_job_ids and app_id in failed_job_ids:
|
||||
myLogger.info_logger("App in RQ")
|
||||
return True
|
||||
if run_job_ids and app_id in run_job_ids:
|
||||
myLogger.info_logger("App in RQ")
|
||||
return True
|
||||
myLogger.info_logger("App not in RQ")
|
||||
return False
|
||||
# started = StartedJobRegistry(queue=q)
|
||||
# failed = FailedJobRegistry(queue=q)
|
||||
# run_job_ids = started.get_job_ids()
|
||||
# failed_job_ids = failed.get_job_ids()
|
||||
# queue_job_ids = q.job_ids
|
||||
# myLogger.info_logger(queue_job_ids)
|
||||
# myLogger.info_logger(run_job_ids)
|
||||
# myLogger.info_logger(failed_job_ids)
|
||||
# if queue_job_ids and app_id in queue_job_ids:
|
||||
# myLogger.info_logger("App in RQ")
|
||||
# return True
|
||||
# if failed_job_ids and app_id in failed_job_ids:
|
||||
# myLogger.info_logger("App in RQ")
|
||||
# return True
|
||||
# if run_job_ids and app_id in run_job_ids:
|
||||
# myLogger.info_logger("App in RQ")
|
||||
# return True
|
||||
# myLogger.info_logger("App not in RQ")
|
||||
# return False
|
||||
|
||||
|
||||
def get_apps_from_queue():
|
||||
myLogger.info_logger("get queque apps...")
|
||||
# 获取 StartedJobRegistry 实例
|
||||
started = StartedJobRegistry(queue=q)
|
||||
finish = FinishedJobRegistry(queue=q)
|
||||
deferred = DeferredJobRegistry(queue=q)
|
||||
failed = FailedJobRegistry(queue=q)
|
||||
scheduled = ScheduledJobRegistry(queue=q)
|
||||
cancel = CanceledJobRegistry(queue=q)
|
||||
# def get_apps_from_queue():
|
||||
# myLogger.info_logger("get queque apps...")
|
||||
# # 获取 StartedJobRegistry 实例
|
||||
# started = StartedJobRegistry(queue=q)
|
||||
# finish = FinishedJobRegistry(queue=q)
|
||||
# deferred = DeferredJobRegistry(queue=q)
|
||||
# failed = FailedJobRegistry(queue=q)
|
||||
# scheduled = ScheduledJobRegistry(queue=q)
|
||||
# cancel = CanceledJobRegistry(queue=q)
|
||||
|
||||
# 获取正在执行的作业 ID 列表
|
||||
run_job_ids = started.get_job_ids()
|
||||
finish_job_ids = finish.get_job_ids()
|
||||
wait_job_ids = deferred.get_job_ids()
|
||||
failed_jobs = failed.get_job_ids()
|
||||
scheduled_jobs = scheduled.get_job_ids()
|
||||
cancel_jobs = cancel.get_job_ids()
|
||||
# # 获取正在执行的作业 ID 列表
|
||||
# run_job_ids = started.get_job_ids()
|
||||
# finish_job_ids = finish.get_job_ids()
|
||||
# wait_job_ids = deferred.get_job_ids()
|
||||
# failed_jobs = failed.get_job_ids()
|
||||
# scheduled_jobs = scheduled.get_job_ids()
|
||||
# cancel_jobs = cancel.get_job_ids()
|
||||
|
||||
myLogger.info_logger(q.jobs)
|
||||
myLogger.info_logger(run_job_ids)
|
||||
myLogger.info_logger(failed_jobs)
|
||||
myLogger.info_logger(cancel_jobs)
|
||||
myLogger.info_logger(wait_job_ids)
|
||||
myLogger.info_logger(finish_job_ids)
|
||||
myLogger.info_logger(scheduled_jobs)
|
||||
# myLogger.info_logger(q.jobs)
|
||||
# myLogger.info_logger(run_job_ids)
|
||||
# myLogger.info_logger(failed_jobs)
|
||||
# myLogger.info_logger(cancel_jobs)
|
||||
# myLogger.info_logger(wait_job_ids)
|
||||
# myLogger.info_logger(finish_job_ids)
|
||||
# myLogger.info_logger(scheduled_jobs)
|
||||
|
||||
installing_list = []
|
||||
for job_id in run_job_ids:
|
||||
app = get_rq_app(job_id, 'installing', "", "", "")
|
||||
installing_list.append(app)
|
||||
for job in q.jobs:
|
||||
app = get_rq_app(job.id, 'installing', "", "", "")
|
||||
installing_list.append(app)
|
||||
for job_id in failed_jobs:
|
||||
job = q.fetch_job(job_id)
|
||||
exc_info = job.exc_info
|
||||
code = exc_info.split('##websoft9##')[1]
|
||||
message = exc_info.split('##websoft9##')[2]
|
||||
detail = exc_info.split('##websoft9##')[3]
|
||||
app = get_rq_app(job_id, 'failed', code, message, detail)
|
||||
installing_list.append(app)
|
||||
# installing_list = []
|
||||
# for job_id in run_job_ids:
|
||||
# app = get_rq_app(job_id, 'installing', "", "", "")
|
||||
# installing_list.append(app)
|
||||
# for job in q.jobs:
|
||||
# app = get_rq_app(job.id, 'installing', "", "", "")
|
||||
# installing_list.append(app)
|
||||
# for job_id in failed_jobs:
|
||||
# job = q.fetch_job(job_id)
|
||||
# exc_info = job.exc_info
|
||||
# code = exc_info.split('##websoft9##')[1]
|
||||
# message = exc_info.split('##websoft9##')[2]
|
||||
# detail = exc_info.split('##websoft9##')[3]
|
||||
# app = get_rq_app(job_id, 'failed', code, message, detail)
|
||||
# installing_list.append(app)
|
||||
|
||||
return installing_list
|
||||
# return installing_list
|
||||
|
||||
#从rq获取app信息
|
||||
def get_rq_app(id, status, code, message, detail):
|
||||
app_name = id.split('_')[0]
|
||||
customer_name = id.split('_')[1]
|
||||
# 当app还在RQ时,可能文件夹还没创建,无法获取trade_mark
|
||||
trade_mark = ""
|
||||
app_version = ""
|
||||
create_time = ""
|
||||
volume_data = ""
|
||||
config_path = ""
|
||||
image_url = get_Image_url(app_name)
|
||||
config = None
|
||||
if status == "installing":
|
||||
status_reason = None
|
||||
else:
|
||||
status_reason = StatusReason(Code=code, Message=message, Detail=detail)
|
||||
# #从rq获取app信息
|
||||
# def get_rq_app(id, status, code, message, detail):
|
||||
# app_name = id.split('_')[0]
|
||||
# customer_name = id.split('_')[1]
|
||||
# # 当app还在RQ时,可能文件夹还没创建,无法获取trade_mark
|
||||
# trade_mark = ""
|
||||
# app_version = ""
|
||||
# create_time = ""
|
||||
# volume_data = ""
|
||||
# config_path = ""
|
||||
# image_url = get_Image_url(app_name)
|
||||
# config = None
|
||||
# if status == "installing":
|
||||
# status_reason = None
|
||||
# else:
|
||||
# status_reason = StatusReason(Code=code, Message=message, Detail=detail)
|
||||
|
||||
app = App(app_id=id, app_name=app_name, customer_name=customer_name, trade_mark=trade_mark,
|
||||
app_version=app_version, create_time=create_time, volume_data=volume_data, config_path=config_path,
|
||||
status=status, status_reason=status_reason, official_app=True, image_url=image_url,
|
||||
app_https=False, app_replace_url=False, config=config)
|
||||
return app.dict()
|
||||
# app = App(app_id=id, app_name=app_name, customer_name=customer_name, trade_mark=trade_mark,
|
||||
# app_version=app_version, create_time=create_time, volume_data=volume_data, config_path=config_path,
|
||||
# status=status, status_reason=status_reason, official_app=True, image_url=image_url,
|
||||
# app_https=False, app_replace_url=False, config=config)
|
||||
# return app.dict()
|
||||
|
||||
|
||||
def get_admin_url(customer_name, url):
|
||||
admin_url = ""
|
||||
path = "/data/apps/" + customer_name + "/.env"
|
||||
try:
|
||||
admin_path = list(docker.read_env(path, "APP_ADMIN_PATH").values())[0]
|
||||
admin_path = admin_path.replace("\"", "")
|
||||
admin_url = url + admin_path
|
||||
except IndexError:
|
||||
pass
|
||||
return admin_url
|
||||
# def get_admin_url(customer_name, url):
|
||||
# admin_url = ""
|
||||
# path = "/data/apps/" + customer_name + "/.env"
|
||||
# try:
|
||||
# admin_path = list(docker.read_env(path, "APP_ADMIN_PATH").values())[0]
|
||||
# admin_path = admin_path.replace("\"", "")
|
||||
# admin_url = url + admin_path
|
||||
# except IndexError:
|
||||
# pass
|
||||
# return admin_url
|
||||
|
||||
def get_container_port(container_name):
|
||||
port = "80"
|
||||
cmd = "docker port " + container_name + " |grep ::"
|
||||
result = shell_execute.execute_command_output_all(cmd)["result"]
|
||||
myLogger.info_logger(result)
|
||||
port = result.split('/')[0]
|
||||
myLogger.info_logger(port)
|
||||
# def get_container_port(container_name):
|
||||
# port = "80"
|
||||
# cmd = "docker port " + container_name + " |grep ::"
|
||||
# result = shell_execute.execute_command_output_all(cmd)["result"]
|
||||
# myLogger.info_logger(result)
|
||||
# port = result.split('/')[0]
|
||||
# myLogger.info_logger(port)
|
||||
|
||||
return port
|
||||
# return port
|
@ -1,3 +1,6 @@
|
||||
import time
|
||||
|
||||
|
||||
def app_domain_list(app_id):
|
||||
code, message = docker.check_app_id(app_id)
|
||||
if code == None:
|
||||
@ -5,7 +8,8 @@ def app_domain_list(app_id):
|
||||
if flag:
|
||||
myLogger.info_logger("Check app_id ok[app_domain_list]")
|
||||
else:
|
||||
raise CommandException(const.ERROR_CLIENT_PARAM_NOTEXIST, "APP is not exist", "")
|
||||
raise CommandException(
|
||||
const.ERROR_CLIENT_PARAM_NOTEXIST, "APP is not exist", "")
|
||||
else:
|
||||
raise CommandException(code, message, "")
|
||||
|
||||
@ -19,7 +23,8 @@ def app_domain_list(app_id):
|
||||
default_domain = ""
|
||||
if domains != None and len(domains) > 0:
|
||||
customer_name = app_id.split('_')[1]
|
||||
app_url = shell_execute.execute_command_output_all("cat /data/apps/" + customer_name + "/.env")["result"]
|
||||
app_url = shell_execute.execute_command_output_all(
|
||||
"cat /data/apps/" + customer_name + "/.env")["result"]
|
||||
if "APP_URL" in app_url:
|
||||
url = shell_execute.execute_command_output_all("cat /data/apps/" + customer_name + "/.env |grep APP_URL=")[
|
||||
"result"].rstrip('\n')
|
||||
@ -28,6 +33,26 @@ def app_domain_list(app_id):
|
||||
myLogger.info_logger(ret)
|
||||
return ret
|
||||
|
||||
|
||||
def get_all_domains(app_id):
|
||||
customer_name = app_id.split('_')[1]
|
||||
domains = []
|
||||
token = get_token()
|
||||
url = const.NGINX_URL+"/api/nginx/proxy-hosts"
|
||||
headers = {
|
||||
'Authorization': token,
|
||||
'Content-Type': 'application/json'
|
||||
}
|
||||
response = requests.get(url, headers=headers)
|
||||
|
||||
for proxy in response.json():
|
||||
portainer_name = proxy["forward_host"]
|
||||
if customer_name == portainer_name:
|
||||
for domain in proxy["domain_names"]:
|
||||
domains.append(domain)
|
||||
return domains
|
||||
|
||||
|
||||
def app_proxy_delete(app_id):
|
||||
customer_name = app_id.split('_')[1]
|
||||
proxy_host = None
|
||||
@ -59,17 +84,20 @@ def app_domain_delete(app_id, domain):
|
||||
if flag:
|
||||
myLogger.info_logger("Check app_id ok[app_domain_delete]")
|
||||
else:
|
||||
raise CommandException(const.ERROR_CLIENT_PARAM_NOTEXIST, "APP is not exist", "")
|
||||
raise CommandException(
|
||||
const.ERROR_CLIENT_PARAM_NOTEXIST, "APP is not exist", "")
|
||||
else:
|
||||
raise CommandException(code, message, "")
|
||||
|
||||
if domain is None or domain == "undefined":
|
||||
raise CommandException(const.ERROR_CLIENT_PARAM_BLANK, "Domains is blank", "")
|
||||
raise CommandException(
|
||||
const.ERROR_CLIENT_PARAM_BLANK, "Domains is blank", "")
|
||||
|
||||
old_all_domains = get_all_domains(app_id)
|
||||
if domain not in old_all_domains:
|
||||
myLogger.info_logger("delete domain is not binded")
|
||||
raise CommandException(const.ERROR_CLIENT_PARAM_NOTEXIST, "Domain is not bind.", "")
|
||||
raise CommandException(
|
||||
const.ERROR_CLIENT_PARAM_NOTEXIST, "Domain is not bind.", "")
|
||||
|
||||
myLogger.info_logger("Start to delete " + domain)
|
||||
proxy = get_proxy_domain(app_id, domain)
|
||||
@ -93,7 +121,8 @@ def app_domain_delete(app_id, domain):
|
||||
response = requests.delete(url, headers=headers)
|
||||
try:
|
||||
if response.json().get("error"):
|
||||
raise CommandException(const.ERROR_CONFIG_NGINX, response.json().get("error").get("message"), "")
|
||||
raise CommandException(
|
||||
const.ERROR_CONFIG_NGINX, response.json().get("error").get("message"), "")
|
||||
except Exception:
|
||||
myLogger.info_logger(response.json())
|
||||
set_domain("", app_id)
|
||||
@ -129,9 +158,11 @@ def app_domain_delete(app_id, domain):
|
||||
"ssl_forced": False
|
||||
}
|
||||
|
||||
response = requests.put(url, data=json.dumps(data), headers=headers)
|
||||
response = requests.put(
|
||||
url, data=json.dumps(data), headers=headers)
|
||||
if response.json().get("error"):
|
||||
raise CommandException(const.ERROR_CONFIG_NGINX, response.json().get("error").get("message"), "")
|
||||
raise CommandException(
|
||||
const.ERROR_CONFIG_NGINX, response.json().get("error").get("message"), "")
|
||||
domain_set = app_domain_list(app_id)
|
||||
default_domain = domain_set['default_domain']
|
||||
# 如果被删除的域名是默认域名,删除后去剩下域名的第一个
|
||||
@ -139,7 +170,9 @@ def app_domain_delete(app_id, domain):
|
||||
set_domain(domains_old[0], app_id)
|
||||
|
||||
else:
|
||||
raise CommandException(const.ERROR_CLIENT_PARAM_NOTEXIST, "Delete domain is not bind", "")
|
||||
raise CommandException(
|
||||
const.ERROR_CLIENT_PARAM_NOTEXIST, "Delete domain is not bind", "")
|
||||
|
||||
|
||||
def app_domain_update(app_id, domain_old, domain_new):
|
||||
myLogger.info_logger("app_domain_update")
|
||||
@ -155,7 +188,8 @@ def app_domain_update(app_id, domain_old, domain_new):
|
||||
if flag:
|
||||
myLogger.info_logger("Check app_id ok")
|
||||
else:
|
||||
raise CommandException(const.ERROR_CLIENT_PARAM_NOTEXIST, "APP is not exist", "")
|
||||
raise CommandException(
|
||||
const.ERROR_CLIENT_PARAM_NOTEXIST, "APP is not exist", "")
|
||||
else:
|
||||
raise CommandException(code, message, "")
|
||||
proxy = get_proxy_domain(app_id, domain_old)
|
||||
@ -196,15 +230,19 @@ def app_domain_update(app_id, domain_old, domain_new):
|
||||
|
||||
response = requests.put(url, data=json.dumps(data), headers=headers)
|
||||
if response.json().get("error"):
|
||||
raise CommandException(const.ERROR_CONFIG_NGINX, response.json().get("error").get("message"), "")
|
||||
raise CommandException(
|
||||
const.ERROR_CONFIG_NGINX, response.json().get("error").get("message"), "")
|
||||
domain_set = app_domain_list(app_id)
|
||||
default_domain = domain_set['default_domain']
|
||||
myLogger.info_logger("default_domain=" + default_domain + ",domain_old=" + domain_old)
|
||||
myLogger.info_logger("default_domain=" +
|
||||
default_domain + ",domain_old=" + domain_old)
|
||||
# 如果被修改的域名是默认域名,修改后也设置为默认域名
|
||||
if default_domain == domain_old:
|
||||
set_domain(domain_new, app_id)
|
||||
else:
|
||||
raise CommandException(const.ERROR_CLIENT_PARAM_NOTEXIST, "edit domain is not exist", "")
|
||||
raise CommandException(
|
||||
const.ERROR_CLIENT_PARAM_NOTEXIST, "edit domain is not exist", "")
|
||||
|
||||
|
||||
def app_domain_add(app_id, domain):
|
||||
temp_domains = []
|
||||
@ -217,13 +255,15 @@ def app_domain_add(app_id, domain):
|
||||
if flag:
|
||||
myLogger.info_logger("Check app_id ok")
|
||||
else:
|
||||
raise CommandException(const.ERROR_CLIENT_PARAM_NOTEXIST, "APP is not exist", "")
|
||||
raise CommandException(
|
||||
const.ERROR_CLIENT_PARAM_NOTEXIST, "APP is not exist", "")
|
||||
else:
|
||||
raise CommandException(code, message, "")
|
||||
|
||||
old_domains = get_all_domains(app_id)
|
||||
if domain in old_domains:
|
||||
raise CommandException(const.ERROR_CLIENT_PARAM_NOTEXIST, "Domain is in use", "")
|
||||
raise CommandException(
|
||||
const.ERROR_CLIENT_PARAM_NOTEXIST, "Domain is in use", "")
|
||||
|
||||
proxy = get_proxy(app_id)
|
||||
if proxy != None:
|
||||
@ -263,7 +303,8 @@ def app_domain_add(app_id, domain):
|
||||
}
|
||||
response = requests.put(url, data=json.dumps(data), headers=headers)
|
||||
if response.json().get("error"):
|
||||
raise CommandException(const.ERROR_CONFIG_NGINX, response.json().get("error").get("message"), "")
|
||||
raise CommandException(
|
||||
const.ERROR_CONFIG_NGINX, response.json().get("error").get("message"), "")
|
||||
else:
|
||||
# 追加
|
||||
token = get_token()
|
||||
@ -300,22 +341,27 @@ def app_domain_add(app_id, domain):
|
||||
response = requests.post(url, data=json.dumps(data), headers=headers)
|
||||
|
||||
if response.json().get("error"):
|
||||
raise CommandException(const.ERROR_CONFIG_NGINX, response.json().get("error").get("message"), "")
|
||||
raise CommandException(
|
||||
const.ERROR_CONFIG_NGINX, response.json().get("error").get("message"), "")
|
||||
set_domain(domain, app_id)
|
||||
|
||||
return domain
|
||||
|
||||
|
||||
def check_domains(domains):
|
||||
myLogger.info_logger(domains)
|
||||
if domains is None or len(domains) == 0:
|
||||
raise CommandException(const.ERROR_CLIENT_PARAM_BLANK, "Domains is blank", "")
|
||||
raise CommandException(
|
||||
const.ERROR_CLIENT_PARAM_BLANK, "Domains is blank", "")
|
||||
else:
|
||||
for domain in domains:
|
||||
if is_valid_domain(domain):
|
||||
if check_real_domain(domain) == False:
|
||||
raise CommandException(const.ERROR_CLIENT_PARAM_NOTEXIST, "Domain and server not match", "")
|
||||
raise CommandException(
|
||||
const.ERROR_CLIENT_PARAM_NOTEXIST, "Domain and server not match", "")
|
||||
else:
|
||||
raise CommandException(const.ERROR_CLIENT_PARAM_Format, "Domains format error", "")
|
||||
raise CommandException(
|
||||
const.ERROR_CLIENT_PARAM_Format, "Domains format error", "")
|
||||
|
||||
|
||||
def is_valid_domain(domain):
|
||||
@ -324,13 +370,17 @@ def is_valid_domain(domain):
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def check_real_domain(domain):
|
||||
domain_real = True
|
||||
try:
|
||||
cmd = "ping -c 1 " + domain + " | grep -Eo '[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+' | uniq"
|
||||
domain_ip = shell_execute.execute_command_output_all(cmd)["result"].rstrip('\n')
|
||||
cmd = "ping -c 1 " + domain + \
|
||||
" | grep -Eo '[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+' | uniq"
|
||||
domain_ip = shell_execute.execute_command_output_all(cmd)[
|
||||
"result"].rstrip('\n')
|
||||
|
||||
ip_result = shell_execute.execute_command_output_all("cat /data/apps/w9services/w9appmanage/public_ip")
|
||||
ip_result = shell_execute.execute_command_output_all(
|
||||
"cat /data/apps/w9services/w9appmanage/public_ip")
|
||||
ip_save = ip_result["result"].rstrip('\n')
|
||||
|
||||
if domain_ip == ip_save:
|
||||
@ -368,25 +418,6 @@ def get_proxy_domain(app_id, domain):
|
||||
return proxy_host
|
||||
|
||||
|
||||
def get_all_domains(app_id):
|
||||
customer_name = app_id.split('_')[1]
|
||||
domains = []
|
||||
token = get_token()
|
||||
url = const.NGINX_URL+"/api/nginx/proxy-hosts"
|
||||
headers = {
|
||||
'Authorization': token,
|
||||
'Content-Type': 'application/json'
|
||||
}
|
||||
response = requests.get(url, headers=headers)
|
||||
|
||||
for proxy in response.json():
|
||||
portainer_name = proxy["forward_host"]
|
||||
if customer_name == portainer_name:
|
||||
for domain in proxy["domain_names"]:
|
||||
domains.append(domain)
|
||||
return domains
|
||||
|
||||
|
||||
def app_domain_set(domain, app_id):
|
||||
temp_domains = []
|
||||
temp_domains.append(domain)
|
||||
@ -398,7 +429,8 @@ def app_domain_set(domain, app_id):
|
||||
if flag:
|
||||
myLogger.info_logger("Check app_id ok")
|
||||
else:
|
||||
raise CommandException(const.ERROR_CLIENT_PARAM_NOTEXIST, "APP is not exist", "")
|
||||
raise CommandException(
|
||||
const.ERROR_CLIENT_PARAM_NOTEXIST, "APP is not exist", "")
|
||||
else:
|
||||
raise CommandException(code, message, "")
|
||||
|
||||
@ -411,34 +443,42 @@ def set_domain(domain, app_id):
|
||||
if domain != "":
|
||||
if domain not in old_domains:
|
||||
message = domain + " is not in use"
|
||||
raise CommandException(const.ERROR_CLIENT_PARAM_NOTEXIST, message, "")
|
||||
raise CommandException(
|
||||
const.ERROR_CLIENT_PARAM_NOTEXIST, message, "")
|
||||
|
||||
customer_name = app_id.split('_')[1]
|
||||
app_url = shell_execute.execute_command_output_all("cat /data/apps/" + customer_name + "/.env")["result"]
|
||||
app_url = shell_execute.execute_command_output_all(
|
||||
"cat /data/apps/" + customer_name + "/.env")["result"]
|
||||
|
||||
if "APP_URL" in app_url:
|
||||
myLogger.info_logger("APP_URL is exist")
|
||||
if domain == "":
|
||||
ip_result = shell_execute.execute_command_output_all("cat /data/apps/w9services/w9appmanage/public_ip")
|
||||
ip_result = shell_execute.execute_command_output_all(
|
||||
"cat /data/apps/w9services/w9appmanage/public_ip")
|
||||
domain = ip_result["result"].rstrip('\n')
|
||||
cmd = "sed -i 's/APP_URL=.*/APP_URL=" + domain + "/g' /data/apps/" + customer_name + "/.env"
|
||||
cmd = "sed -i 's/APP_URL=.*/APP_URL=" + domain + \
|
||||
"/g' /data/apps/" + customer_name + "/.env"
|
||||
shell_execute.execute_command_output_all(cmd)
|
||||
if "APP_URL_REPLACE=true" in app_url:
|
||||
myLogger.info_logger("need up")
|
||||
shell_execute.execute_command_output_all("cd /data/apps/" + customer_name + " && docker compose up -d")
|
||||
shell_execute.execute_command_output_all(
|
||||
"cd /data/apps/" + customer_name + " && docker compose up -d")
|
||||
else:
|
||||
cmd = "sed -i 's/APP_URL=.*/APP_URL=" + domain + "/g' /data/apps/" + customer_name + "/.env"
|
||||
cmd = "sed -i 's/APP_URL=.*/APP_URL=" + domain + \
|
||||
"/g' /data/apps/" + customer_name + "/.env"
|
||||
shell_execute.execute_command_output_all(cmd)
|
||||
if "APP_URL_REPLACE=true" in app_url:
|
||||
myLogger.info_logger("need up")
|
||||
shell_execute.execute_command_output_all("cd /data/apps/" + customer_name + " && docker compose up -d")
|
||||
shell_execute.execute_command_output_all(
|
||||
"cd /data/apps/" + customer_name + " && docker compose up -d")
|
||||
else:
|
||||
myLogger.info_logger("APP_URL is not exist")
|
||||
if domain == "":
|
||||
ip_result = shell_execute.execute_command_output_all("cat /data/apps/w9services/w9appmanage/public_ip")
|
||||
ip_result = shell_execute.execute_command_output_all(
|
||||
"cat /data/apps/w9services/w9appmanage/public_ip")
|
||||
domain = ip_result["result"].rstrip('\n')
|
||||
|
||||
cmd = "sed -i '/APP_NETWORK/a APP_URL=" + domain + "' /data/apps/" + customer_name + "/.env"
|
||||
cmd = "sed -i '/APP_NETWORK/a APP_URL=" + domain + \
|
||||
"' /data/apps/" + customer_name + "/.env"
|
||||
shell_execute.execute_command_output_all(cmd)
|
||||
myLogger.info_logger("set_domain success")
|
||||
|
||||
|
97
appmanage_new/app/services/domain_manager.py
Normal file
97
appmanage_new/app/services/domain_manager.py
Normal file
@ -0,0 +1,97 @@
|
||||
|
||||
import time
|
||||
import keyring
|
||||
import json
|
||||
from app.core.logger import logger
|
||||
from app.external.nginx_proxy_manager_api import NginxProxyManagerAPI
|
||||
|
||||
|
||||
class DomainManager:
|
||||
def __init__(self, app_name):
|
||||
"""
|
||||
Init Domain Manager
|
||||
Args:
|
||||
app_name (str): The name of the app
|
||||
"""
|
||||
self.app_name = app_name
|
||||
try:
|
||||
self.nginx = NginxProxyManagerAPI()
|
||||
self._init_nginx_token()
|
||||
except Exception as e:
|
||||
logger.error(f"Init Nginx Proxy Manager API Error:{e}")
|
||||
raise e
|
||||
|
||||
def _init_nginx_token(self):
|
||||
"""
|
||||
Get Nginx Proxy Manager's Token From Keyring, if the token is expired or not got from keyring, get a new one and set it to keyring
|
||||
"""
|
||||
service_name = 'nginx_proxy_manager'
|
||||
token_name = "nginx_token"
|
||||
|
||||
# Try to get token from keyring
|
||||
try:
|
||||
token_json_str = keyring.get_password(service_name, token_name)
|
||||
except Exception as e:
|
||||
logger.error(f"Get Nginx Proxy Manager's Token From Keyring Error:{e}")
|
||||
token_json_str = None
|
||||
|
||||
# if the token is got from keyring, parse it
|
||||
if token_json_str is not None:
|
||||
token_json = json.loads(token_json_str)
|
||||
expires = token_json.get("expires")
|
||||
api_token = token_json.get("token")
|
||||
|
||||
# if the token is not expired, return it
|
||||
if int(expires) - int(time.time()) > 3600:
|
||||
self.nginx.api_token = api_token
|
||||
return
|
||||
|
||||
# if the token is expired or not got from keyring, get a new one
|
||||
try:
|
||||
nginx_tokens = self.nginx.get_token("userName","userPwd")
|
||||
except Exception as e:
|
||||
logger.error(f"Get Nginx Proxy Manager's Token Error:{e}")
|
||||
return
|
||||
|
||||
expires = nginx_tokens.get("expires")
|
||||
api_token = nginx_tokens.get("token")
|
||||
|
||||
self.nginx.api_token = api_token
|
||||
|
||||
token_json = {
|
||||
"expires": expires,
|
||||
"token": api_token
|
||||
}
|
||||
|
||||
# set new token to keyring
|
||||
try:
|
||||
keyring.set_password(service_name, token_name, json.dumps(token_json))
|
||||
except Exception as e:
|
||||
logger.error(f"Set Nginx Proxy Manager's Token To Keyring Error:{e}")
|
||||
return
|
||||
|
||||
def is_valid_domain(self, domain_names: list[str]):
|
||||
# 验证domain_names这个列表中的域名格式是否合法,如果不合法,返回以列表格式返回不合法的域名,如果合法,继续验证其是否解析到本机,如果没有解析到本机,返回以列表格式返回没有解析到本机的域名
|
||||
# 验证域名格式是否合法
|
||||
invalid_domain_names = []
|
||||
for domain_name in domain_names:
|
||||
if not self.nginx.is_valid_domain(domain_name):
|
||||
invalid_domain_names.append(domain_name)
|
||||
if len(invalid_domain_names) > 0:
|
||||
return False, invalid_domain_names
|
||||
# 验证域名是否解析到本机
|
||||
not_resolved_domain_names = []
|
||||
for domain_name in domain_names:
|
||||
if not self.nginx.is_resolved_domain(domain_name):
|
||||
not_resolved_domain_names.append(domain_name)
|
||||
if len(not_resolved_domain_names) > 0:
|
||||
return False, not_resolved_domain_names
|
||||
return True, None
|
||||
|
||||
|
||||
def create_proxy_for_app(self, domain_names:list[str],forward_port:int,advanced_config:str="",forward_scheme:str="http"):
|
||||
try:
|
||||
self.nginx.create_proxy_host(domain_names=domain_names,forward_scheme=forward_scheme,forward_port=forward_port,advanced_config=advanced_config)
|
||||
except Exception as e:
|
||||
logger.error(f"Create Proxy Host For {self.app_name} Error {e}")
|
||||
raise e
|
@ -1,41 +0,0 @@
|
||||
#!/bin/bash
|
||||
url_list=(
|
||||
api.ipify.org
|
||||
bot.whatismyipaddress.com
|
||||
icanhazip.com
|
||||
ifconfig.co
|
||||
ident.me
|
||||
ifconfig.me
|
||||
icanhazip.com
|
||||
ipecho.net/plain
|
||||
ipinfo.io/ip
|
||||
ip.sb
|
||||
whatismyip.akamai.com
|
||||
inet-ip.info
|
||||
)
|
||||
|
||||
curl_ip(){
|
||||
curl --connect-timeout 1 -m 2 $1 2>/dev/null
|
||||
return $?
|
||||
}
|
||||
|
||||
debug(){
|
||||
for x in ${url_list[*]}
|
||||
do
|
||||
curl_ip $x
|
||||
done
|
||||
}
|
||||
|
||||
print_ip(){
|
||||
for n in ${url_list[*]}
|
||||
do
|
||||
public_ip=`curl_ip $n`
|
||||
check_ip=`echo $public_ip | awk -F"." '{print NF}'`
|
||||
if [ ! -z "$public_ip" -a $check_ip -eq "4" ]; then
|
||||
echo $public_ip
|
||||
exit 0
|
||||
fi
|
||||
done
|
||||
}
|
||||
#debug
|
||||
print_ip
|
@ -1,6 +0,0 @@
|
||||
class Singleton(type):
|
||||
_instances = {}
|
||||
def __call__(cls, *args, **kwargs):
|
||||
if cls not in cls._instances:
|
||||
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
|
||||
return cls._instances[cls]
|
68
appmanage_new/app/utils/public_ip_getter.py
Normal file
68
appmanage_new/app/utils/public_ip_getter.py
Normal file
@ -0,0 +1,68 @@
|
||||
import requests
|
||||
|
||||
from app.core.config import ConfigManager
|
||||
|
||||
class PublicIPGetter:
|
||||
"""
|
||||
A utility class to retrieve a valid IPv4 address.
|
||||
|
||||
Attributes:
|
||||
url_list (list[str]): A list of URLs to retrieve the response from.
|
||||
|
||||
Methods:
|
||||
get_ip_address(): Retrieves and returns a valid IPv4 address.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
"""
|
||||
Initializes the PublicIPGetter class.
|
||||
"""
|
||||
self.url_list = [url.strip() for url in ConfigManager().get_value("public_ip_url_list", "url_list").split(',')]
|
||||
|
||||
def get_ip_address(self):
|
||||
"""
|
||||
Retrieves and returns a valid IPv4 address from the list of URLs.
|
||||
|
||||
Returns:
|
||||
str: The valid IPv4 address if found, otherwise None.
|
||||
"""
|
||||
for url in self.url_list:
|
||||
ip = self._get_ip(url)
|
||||
if ip and self._is_valid_ipv4(ip):
|
||||
return ip
|
||||
return None
|
||||
|
||||
def _get_ip(self, url):
|
||||
"""
|
||||
Retrieves and returns the response from the given URL.
|
||||
|
||||
Args:
|
||||
url (str): The URL to retrieve the response from.
|
||||
|
||||
Returns:
|
||||
str: The response from the given URL if found, otherwise None.
|
||||
"""
|
||||
try:
|
||||
response = requests.get(url, timeout=2)
|
||||
if response.status_code == 200:
|
||||
return response.text.strip()
|
||||
except requests.RequestException:
|
||||
pass
|
||||
return None
|
||||
|
||||
def _is_valid_ipv4(self, ip) -> bool:
|
||||
"""
|
||||
Checks if the given string is a valid IPv4 address.
|
||||
|
||||
Args:
|
||||
ip (str): The string to check.
|
||||
|
||||
Returns:
|
||||
bool: True if the string is a valid IPv4 address, otherwise False.
|
||||
"""
|
||||
parts = ip.split('.')
|
||||
return len(parts) == 4 and all(0 <= int(part) < 256 for part in parts)
|
||||
|
||||
if __name__ == "__main__":
|
||||
ip = PublicIPGetter().get_ip_address()
|
||||
print(ip)
|
@ -34,7 +34,6 @@ def execute_command_output_all(cmd_str):
|
||||
raise CommandException(const.ERROR_SERVER_COMMAND, "Docker returns the original error", process.stderr)
|
||||
|
||||
|
||||
|
||||
# This fuction is convert container commands to host machine commands
|
||||
def convert_command(cmd_str):
|
||||
convert_cmd = ""
|
||||
|
@ -1,68 +0,0 @@
|
||||
from api.utils.log import myLogger
|
||||
from api.utils.helper import Singleton
|
||||
|
||||
|
||||
# This class is add/modify/list/delete item to item=value(键值对) model settings file
|
||||
|
||||
class SettingsFile(object):
|
||||
|
||||
__metaclass__ = Singleton
|
||||
|
||||
def __init__(self, path):
|
||||
self._config = {}
|
||||
self.config_file = path
|
||||
|
||||
def build_config(self):
|
||||
try:
|
||||
with open(self.config_file, 'r') as f:
|
||||
data = f.readlines()
|
||||
except Exception as e:
|
||||
data = []
|
||||
for i in data:
|
||||
if i.startswith('#'):
|
||||
continue
|
||||
i = i.replace('\n', '').replace('\r\n', '')
|
||||
if not i:
|
||||
continue
|
||||
tmp = i.split('=')
|
||||
if len(tmp) != 2:
|
||||
myLogger.error_logger(f'invalid format {i}')
|
||||
continue
|
||||
|
||||
key, value = i.split('=')
|
||||
if self._config.get(key) != value:
|
||||
self._config[key] = value
|
||||
return self._config
|
||||
|
||||
def init_config_from_file(self, config_file: str=None):
|
||||
if config_file:
|
||||
self.config_file = config_file
|
||||
self.build_config()
|
||||
|
||||
def update_setting(self, key: str, value: str):
|
||||
self._config[key] = value
|
||||
self.flush_config()
|
||||
|
||||
def get_setting(self, key: str, default=None):
|
||||
return self._config.get(key, default)
|
||||
|
||||
def list_all_settings(self) -> dict:
|
||||
self.build_config()
|
||||
return self._config
|
||||
|
||||
def delete_setting(self, key: str, value: str):
|
||||
if self._config.get(key) == value:
|
||||
del self._config[key]
|
||||
self.flush_config()
|
||||
|
||||
def flush_config(self):
|
||||
try:
|
||||
with open(self.config_file, 'w') as f:
|
||||
for key, value in self._config.items():
|
||||
f.write(f'{key}={value}\n')
|
||||
except Exception as e:
|
||||
myLogger.error_logger(e)
|
||||
|
||||
|
||||
# This class is add/modify/cat/delete content from file
|
||||
# src: path | URL
|
@ -1,7 +1,7 @@
|
||||
from fastapi import FastAPI
|
||||
from fastapi.routing import APIRouter
|
||||
from api.v1 import main as v1_router
|
||||
# from fastapi import FastAPI
|
||||
# from fastapi.routing import APIRouter
|
||||
# from api.v1 import main as v1_router
|
||||
|
||||
app = FastAPI()
|
||||
# app = FastAPI()
|
||||
|
||||
app.include_router(v1_router.router, prefix="/api/v1")
|
||||
# app.include_router(v1_router.router, prefix="/api/v1")
|
||||
|
@ -6,5 +6,4 @@ docker
|
||||
psutil
|
||||
gunicorn
|
||||
python-dotenv
|
||||
sqlalchemy
|
||||
databases[sqlite]
|
||||
keyring
|
Loading…
Reference in New Issue
Block a user