update appmanae_new

This commit is contained in:
zhaojing1987 2023-09-19 14:58:05 +08:00
parent 27bfab00fb
commit baef6daf80
21 changed files with 2780 additions and 307 deletions

View File

@ -1,4 +1,5 @@
# run app : uvicorn app.main:app --reload --port 8080
# run app : uvicorn src.main:app --reload --port 9999
# gitea_token: da7b9891a0bc71b5026b389c11ed13238c9a3866
# 项目结构

File diff suppressed because it is too large Load Diff

View File

@ -2,6 +2,7 @@ fastapi
uvicorn
keyring
requests
git
keyrings.alt
requests
GitPython
PyJWT

View File

@ -1,22 +1,30 @@
from fastapi import APIRouter, HTTPException, Query
from typing import List, Optional
from fastapi import APIRouter, Query
from src.schemas.appInstall import appInstall
from src.schemas.errorResponse import ErrorResponse
from src.services.app_manager import AppManger
from pydantic import BaseModel
router = APIRouter(prefix="/api/v1")
from src.schemas.appInstall import appInstallPayload
router = APIRouter()
@router.get("/apps/")
def get_apps():
return {"apps": "apps"}
return {"apps": []}
@router.post("/apps/install",summary="Install App",description="Install an app on an endpoint",responses={400: {"description": "Invalid EndpointId"}, 500: {"description": "Internal Server Error"}})
def apps_install(app_install_payload: appInstallPayload, endpointId: str = Query(..., description="Endpoint ID to install app on")):
try:
if endpointId < 0:
raise HTTPException(status_code=400, detail="Invalid EndpointId")
app_name = app_install_payload.app_name
return app_name
except Exception as e:
raise HTTPException(status_code=500, detail="Internal Server Error")
@router.post(
"/apps/install",
summary="Install App",
response_model_exclude_defaults=True,
description="Install an app on an endpoint",
responses={
200: {"description": "Success"},
400: {"model": ErrorResponse},
500: {"model": ErrorResponse},
}
)
def apps_install(
appInstall: appInstall,
endpointId: int = Query(None, description="Endpoint ID to install app on,if not set, install on the local endpoint"),
):
appManger = AppManger()
appManger.install_app(appInstall, endpointId)

View File

@ -0,0 +1,12 @@
from fastapi import APIRouter, Query
from typing import List, Optional
router = APIRouter()
@router.get("/proxys".format(),summary="Get proxys",description="Get proxys")
def get_proxys():
return {"proxys": "proxys"}
@router.put("/proxys")
def update_settings():
return {"proxys": "proxys"}

View File

@ -1,6 +1,31 @@
# nginx_proxy_manager is the base url of nginx proxy manager, which is used to configure the proxy
# The config for appmanage
[appmanage]
access_token =
# The config for nginx proxy manager
[nginx_proxy_manager]
base_url = http://websoft9-nginxproxymanager:81
# base_url = http://websoft9-nginxproxymanager:81/api
base_url = http://47.92.222.186/nginxproxymanager/api
user_name = help@websoft9.com
user_pwd = websoft9@123456
#The config for gitea
[gitea]
#base_url = http://websoft9-gitea:3000/api/v1
base_url = http://47.92.222.186/git/api/v1
user_name = websoft9
user_pwd = websoft9
#The config for portainer
[portainer]
# base_url = http://websoft9-portainer:9000/api
base_url = http://47.92.222.186/portainer/api
user_name = admin
user_pwd = websoft9@123456
#The path of docker library
[docker_library]
path = /data/library/apps
# public_ip_url_list is a list of public ip url, which is used to get the public ip of the server
[public_ip_url_list]
@ -14,4 +39,5 @@ url_list = https://api.ipify.org/,
https://ip.sb/,
http://whatismyip.akamai.com/,
https://inet-ip.info/,
http://bot.whatismyipaddress.com/
http://bot.whatismyipaddress.com/

View File

@ -0,0 +1,101 @@
import requests
class APIHelper:
"""
Helper class for making API calls
Attributes:
base_url (str): Base URL for API
headers (dict): Headers
Methods:
get(path: str, params: dict = None, headers: dict = None) -> Response: Get a resource
post(path: str, params: dict = None, json: dict = None, headers: dict = None) -> Response: Create a resource
put(path: str, params: dict = None, json: dict = None, headers: dict = None) -> Response: Update a resource
delete(path: str, headers: dict = None) -> Response: Delete a resource
"""
def __init__(self, base_url, headers=None):
"""
Initialize the APIHelper instance.
Args:
base_url (str): Base URL for API
headers (dict): Headers
"""
self.base_url = base_url
self.headers = headers
def get(self, path, params=None, headers=None):
"""
Get a resource
Args:
path (str): Path to resource
params (dict): Query parameters
headers (dict): Headers
Returns:
Response: Response from API
"""
url = f"{self.base_url}/{path}"
return requests.get(url, params=params, headers=self._merge_headers(headers))
def post(self, path, params=None, json=None, headers=None):
"""
Create a resource
Args:
path (str): Path to resource
params (dict): Query parameters
json (dict): JSON payload
headers (dict): Headers
Returns:
Response: Response from API
"""
url = f"{self.base_url}/{path}"
return requests.post(url, params=params, json=json, headers=self._merge_headers(headers))
def put(self, path, params=None, json=None, headers=None):
"""
Update a resource
Args:
path (str): Path to resource
params (dict): Query parameters
json (dict): JSON payload
headers (dict): Headers
Returns:
Response: Response from API
"""
url = f"{self.base_url}/{path}"
return requests.put(url, params=params, json=json, headers=self._merge_headers(headers))
def delete(self, path, headers=None):
"""
Delete a resource
Args:
path (str): Path to resource
headers (dict): Headers
Returns:
Response: Response from API
"""
url = f"{self.base_url}/{path}"
return requests.delete(url, headers=self._merge_headers(headers))
def _merge_headers(self, headers):
"""
Merge the headers passed in with the headers set on the APIHelper instance.
Args:
headers (dict): Headers to merge
Returns:
dict: Merged headers
"""
if self.headers and headers:
return {**self.headers, **headers}
return self.headers or headers

View File

@ -1,4 +1,4 @@
import os
import configparser
@ -17,14 +17,19 @@ class ConfigManager:
config (configparser.ConfigParser): The configuration data in memory.
"""
def __init__(self, config_file_path="../config/config.ini"):
def __init__(self, config_file_name="config.ini"):
"""
Initialize a ConfigManager instance.
Args:
config_file_path (str): The path to the configuration file.
config_file_name (str): The name of the configuration file in the config directory, default is "config.ini".
"""
self.config_file_path = config_file_path
script_dir = os.path.dirname(os.path.realpath(__file__))
config_dir = os.path.join(script_dir, "../config")
self.config_file_path = os.path.join(config_dir, config_file_name)
self.config_file_path = os.path.abspath(self.config_file_path)
self.config = configparser.ConfigParser()
self.config.read(self.config_file_path)

View File

@ -0,0 +1,13 @@
class CustomException(Exception):
"""
Custom Exception
Attributes:
status_code (int): HTTP status code,default is 500
message (str): Error message,default is "Internal Server Error"
details (str): Error details,default is "Internal Server Error"
"""
def __init__(self, status_code: int=500, message: str="Internal Server Error", details: str="Internal Server Error"):
self.status_code = status_code
self.message = message
self.details = details

View File

@ -1,4 +1,5 @@
from datetime import datetime
import os
import logging
from logging.handlers import TimedRotatingFileHandler
@ -48,12 +49,13 @@ class Logger(metaclass=SingletonMeta):
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s')
'%(asctime)s - %(levelname)s - %(message)s')
log_folder = os.path.join(os.getcwd(), "logs")
os.makedirs(log_folder, exist_ok=True)
log_file = os.path.join(log_folder, f"{log_type}_{{asctime}}.log")
current_time = datetime.now().strftime('%Y_%m_%d')
log_file = os.path.join(log_folder, f"{log_type}_{current_time}.log")
file_handler = TimedRotatingFileHandler(
filename=log_file,

View File

@ -1,3 +1,128 @@
import base64
from src.core.apiHelper import APIHelper
from src.core.config import ConfigManager
class GiteaAPI:
def __init__(self) -> None:
pass
"""
This class is used to interact with Gitea API
Attributes:
api (APIHelper): API helper
Methods:
get_repos() -> Response: Get repositories
create_repo(repo_name: str) -> Response: Create repository
remove_repo(repo_name: str) -> Response: Remove repository
update_file_in_repo(repo_name: str, file_path: str, content: str) -> Response: Update file in repository
get_file_content_from_repo(repo_name: str, file_path: str) -> Response: Get file content from repository
update_file_content_in_repo(repo_name: str, file_path: str, content: str, sha: str) -> Response: Update file content in repository
"""
def __init__(self):
"""
Initialize the GiteaAPI instance
"""
self.owner = ConfigManager().get_value("gitea", "user_name")
self.api = APIHelper(
ConfigManager().get_value("gitea", "base_url"),
{
"Content-Type": "application/json",
},
)
def set_credential(self, credential: str):
"""
Set credential
Args:
credential (str): Credential
"""
self.api.headers["Authorization"] = f"Basic {credential}"
# def get_repos(self):
# return self.api.get(path="user/repos", params={"page": 1, "limit": 1000})
def get_repo_by_name(self, repo_name: str):
"""
Get repository by name
Args:
repo_name (str): Repository name
Returns:
Response: Response from Gitea API
"""
return self.api.get(path=f"repos/{self.owner}/{repo_name}")
def create_repo(self, repo_name: str):
"""
Create repository
Args:
repo_name (str): Repository name
Returns:
Response: Response from Gitea API
"""
return self.api.post(
path="user/repos",
json={
"auto_init": True,
"default_branch": "main",
"name": repo_name,
"trust_model": "default",
},
)
def remove_repo(self, repo_name: str):
"""
Remove repository
Args:
repo_name (str): Repository name
Returns:
Response: Response from Gitea API
"""
return self.api.delete(path=f"repos/{self.owner}/{repo_name}")
def get_file_content_from_repo(self, repo_name: str, file_path: str):
"""
Get file content from repository
Args:
repo_name (str): Repository name
file_path (str): File path
Returns:
Response: Response from Gitea API
"""
return self.api.get(
path=f"repos/{self.owner}/{repo_name}/contents/{file_path}",
params={"ref": "main"},
)
def update_file_content_in_repo(self, repo_name: str, file_path: str, content: str, sha: str):
"""
Update file content in repository
Args:
repo_name (str): Repository name
file_path (str): File path
content (str): Content: base64 encoded
sha (str): SHA
Returns:
Response: Response from Gitea API
"""
return self.api.put(
path=f"repos/{self.owner}/{repo_name}/contents/{file_path}",
json={
"branch": "main",
"sha": sha,
"content": content,
"message": f"Update {file_path}",
},
)

View File

@ -1,263 +1,169 @@
import requests
from typing import List, Union
from typing import List
from src.core.apiHelper import APIHelper
from src.core.config import ConfigManager
class NginxProxyManagerAPI:
"""
This class provides methods to interact with the Nginx Proxy Manager API.
Run the following command to start the Nginx Proxy Manager API:
docker run -p 9090:8080 -e SWAGGER_JSON=/foo/api.swagger.json -v /data/websoft9/appmanage_new/docs/:/foo swaggerapi/swagger-ui
Attributes:
base_url (str): The base URL of the Nginx Proxy Manager API.
api_token (str): The API Token to use for authorization.
api (APIHelper): API helper
Methods:
get_token(identity: str,secret: str): Request a new access token
refresh_token(): Refresh your access token
get_proxy_hosts(): Get all proxy hosts
create_proxy_host(domain_names: List[str],forward_scheme:str,forward_host: str,forward_port: int ,advanced_config: str): Create a new proxy host
update_proxy_host(proxy_id: int,domain_names: List[str],forward_scheme:str,forward_host: str,forward_port: int ,advanced_config: str): Update an existing proxy host
delete_proxy_host(proxy_id: int): Delete a proxy host
get_token(identity: str, secret: str) -> Response: Request a new access token
get_proxy_hosts() -> Response: Get all proxy hosts
create_proxy_host(domain_names: List[str], forward_scheme: str, forward_host: str, forward_port: int, advanced_config: str) -> Response: Create a new proxy host
update_proxy_host(proxy_id: int, domain_names: List[str], forward_scheme: str, forward_host: str, forward_port: int, advanced_config: str) -> Response: Update an existing proxy host
delete_proxy_host(proxy_id: int) -> Response: Delete a proxy host
"""
def __init__(self):
"""
Initialize the NginxProxyManagerAPI instance.
"""
self.base_url = ConfigManager().get_value("nginx_proxy_manager", "base_url")
self.api_token = None
self.api = APIHelper(
ConfigManager().get_value("nginx_proxy_manager", "base_url"),
{
"Content-Type": "application/json",
},
)
def get_token(self, identity: str, secret: str) -> Union[dict, None]:
def set_token(self, api_token: str):
"""
Set API token
Args:
api_token (str): API token
"""
self.api.headers["Authorization"] = f"Bearer {api_token}"
def get_token(self, identity: str, secret: str):
"""
Request a new access token
Args:
identity (string): user account with an email address
secret (string): user password
identity (str): Identity
secret (str): Secret
Returns:
dict or None: A dictionary containing token-related information if successful,otherwise None. The dictionary structure is as follows:
If successful:
{
"expires": str, # Expiry timestamp of the token
"token": str # The access token
}
If unsuccessful:
None
"""
url = f"{self.base_url}/api/tokens"
headers = {
'Content-Type': 'application/json'
}
json = {
"identity": identity,
"scope": "user",
"secret": secret
}
response = requests.post(url, json=json, headers=headers)
if response.status_code == 200:
return response.json()
else:
return None
def refresh_token(self) -> Union[dict, None]:
Response: Response from Nginx Proxy Manager API
"""
Refresh your access token
return self.api.post(
path="tokens",
headers={"Content-Type": "application/json"},
json={"identity": identity, "scope": "user", "secret": secret},
)
def get_proxy_hosts(self):
"""
get all proxy hosts
Returns:
dict or None: A dictionary containing token-related information if successful,otherwise None. The dictionary structure is as follows:
If successful:
{
"expires": str, # Expiry timestamp of the token
"token": str # The access token
}
If unsuccessful:
None
Response: Response from Nginx Proxy Manager API
"""
url = f"{self.base_url}/api/tokens"
headers = {
'Content-Type': 'application/json',
"Authorization": f"Bearer {self.api_token}"
}
response = requests.get(url, headers=headers)
if response.status_code == 200:
return response.json()
else:
return None
return self.api.get(
path="nginx/proxy-hosts", params={"expand": "owner,access_list,certificate"}
)
def get_proxy_hosts(self) -> Union[List[dict], None]:
"""
Get all proxy hosts
Returns:
list or None: If the retrieval is successful, returns a list of dictionaries containing proxy host information, where each dictionary includes:
- "proxy_id": The ID of the proxy host.
- "forward_host": The target host name of the proxy.
- "domain_names": A list of domain names associated with the proxy host.
Returns None if the retrieval fails.
"""
url = f"{self.base_url}/api/nginx/proxy-hosts"
params = {"expand": "owner,access_list,certificate"}
headers = {
'Content-Type': 'application/json',
"Authorization": f"Bearer {self.api_token}"
}
response = requests.get(url, params=params, headers=headers)
if response.status_code == 200:
proxy_hosts = response.json()
result_dict = [
{
"proxy_id": proxy["id"],
"forward_host": proxy["forward_host"],
"domain_names": proxy["domain_names"]
}
for proxy in proxy_hosts
]
return result_dict
else:
return None
def create_proxy_host(self, domain_names: List[str], forward_scheme: str, forward_host: str, forward_port: int, advanced_config: str) -> Union[dict, None]:
def create_proxy_host(
self,
domain_names: List[str],
forward_scheme: str,
forward_host: str,
forward_port: int,
advanced_config: str,
):
"""
Create a new proxy host
Args:
domain_names (List[str]): List of domain names associated with the proxy host.
forward_scheme (str): The scheme (HTTP or HTTPS) for forwarding traffic.
forward_host (str): The target host to which traffic will be forwarded.
forward_port (int): The port on the target host to which traffic will be forwarded.
advanced_config (str): Advanced configuration options for the proxy host.
domain_names (List[str]): Domain names
forward_scheme (str): Forward scheme
forward_host (str): Forward host
forward_port (int): Forward port
advanced_config (str): Advanced config
Returns:
dict or None: If the proxy host creation is successful,
returns a dictionary containing information about the created proxy host with the following fields:
- "proxy_id": The id of the created proxy host.
- "forward_host": The target host name of the proxy.
- "domain_names": A list of domain names associated with the proxy host.
Returns None if the proxy host creation fails .
Response: Response from Nginx Proxy Manager API
"""
url = f"{self.base_url}/api/nginx/proxy-hosts"
json = {
"domain_names": domain_names,
"forward_scheme": forward_scheme,
"forward_host": forward_host,
"forward_port": forward_port,
"access_list_id": "0",
"certificate_id": 0,
"meta": {
"letsencrypt_agree": False,
"dns_challenge": False
},
"advanced_config": advanced_config,
"block_exploits": False,
"caching_enabled": False,
"allow_websocket_upgrade": False,
"http2_support": False,
"hsts_enabled": False,
"hsts_subdomains": False,
"ssl_forced": False,
"locations": [],
}
headers = {
'Content-Type': 'application/json',
"Authorization": f"Bearer {self.api_token}"
}
response = requests.post(url, json=json, headers=headers)
if response.status_code == 201:
proxy_hosts = response.json()
proxy_id = proxy_hosts.get("id")
domain_names = proxy_hosts.get("domain_names")
forward_host = proxy_hosts.get("forward_host")
result_dict = {
"proxy_id": proxy_id,
return self.api.post(
path="nginx/proxy-hosts",
json={
"domain_names": domain_names,
"forward_scheme": forward_scheme,
"forward_host": forward_host,
"domain_names": domain_names
}
return result_dict
else:
return None
"forward_port": forward_port,
"access_list_id": "0",
"certificate_id": 0,
"meta": {"letsencrypt_agree": False, "dns_challenge": False},
"advanced_config": advanced_config,
"block_exploits": False,
"caching_enabled": False,
"allow_websocket_upgrade": False,
"http2_support": False,
"hsts_enabled": False,
"hsts_subdomains": False,
"ssl_forced": False,
"locations": [],
},
)
def update_proxy_host(self, proxy_id: int, domain_names: List[str], forward_scheme: str, forward_host: str, forward_port: int, advanced_config: str) -> Union[dict, None]:
def update_proxy_host(
self,
proxy_id: int,
domain_names: List[str],
forward_scheme: str,
forward_host: str,
forward_port: int,
advanced_config: str,
):
"""
Update an existing proxy host.
Update an existing proxy host
Args:
proxy_id (int): The ID of the proxy host to be updated.
domain_names (List[str]): List of updated domain names associated with the proxy host.
forward_scheme (str): The updated scheme (HTTP or HTTPS) for forwarding traffic.
forward_host (str): The updated target host to which traffic will be forwarded.
forward_port (int): The updated port on the target host to which traffic will be forwarded.
advanced_config (str): Updated advanced configuration options for the proxy host.
proxy_id (int): Proxy ID
domain_names (List[str]): Domain names
forward_scheme (str): Forward scheme
forward_host (str): Forward host
forward_port (int): Forward port
advanced_config (str): Advanced config
Returns:
dict or None: If the proxy host update is successful,
returns a dictionary containing information about the updated proxy host with the following fields:
- "proxy_id": The ID of the updated proxy host.
- "forward_host": The target host name of the proxy after the update.
- "domain_names": A list of updated domain names associated with the proxy host.
Returns None if the proxy host update fails.
Response: Response from Nginx Proxy Manager API
"""
url = f"{self.base_url}/api/nginx/proxy-hosts/{proxy_id}"
headers = {
'Content-Type': 'application/json',
"Authorization": f"Bearer {self.api_token}"
}
json = {
"domain_names": domain_names,
"forward_scheme": forward_scheme,
"forward_host": forward_host,
"forward_port": forward_port,
"access_list_id": "0",
"certificate_id": 0,
"meta": {
"letsencrypt_agree": False,
"dns_challenge": False
},
"advanced_config": advanced_config,
"block_exploits": False,
"caching_enabled": False,
"allow_websocket_upgrade": False,
"http2_support": False,
"hsts_enabled": False,
"hsts_subdomains": False,
"ssl_forced": False,
"locations": [],
}
response = requests.put(url, json=json, headers=headers)
if response.status_code == 200:
proxy_hosts = response.json()
proxy_id = proxy_hosts.get("id")
domain_names = proxy_hosts.get("domain_names")
forward_host = proxy_hosts.get("forward_host")
result_dict = {
"proxy_id": proxy_id,
return self.api.put(
path=f"nginx/proxy-hosts/{proxy_id}",
json={
"domain_names": domain_names,
"forward_scheme": forward_scheme,
"forward_host": forward_host,
"domain_names": domain_names
}
return result_dict
else:
return None
"forward_port": forward_port,
"access_list_id": "0",
"certificate_id": 0,
"meta": {"letsencrypt_agree": False, "dns_challenge": False},
"advanced_config": advanced_config,
"block_exploits": False,
"caching_enabled": False,
"allow_websocket_upgrade": False,
"http2_support": False,
"hsts_enabled": False,
"hsts_subdomains": False,
"ssl_forced": False,
"locations": [],
},
)
def delete_proxy_host(self, proxy_id: int) -> Union[bool, None]:
def delete_proxy_host(self, proxy_id: int):
"""
Delete a proxy host
Args:
proxy_id (int): The ID of the proxy host to be deleted.
proxy_id (int): Proxy ID
Returns:
bool or None: Returns the response object if the proxy host is successfully deleted ,
indicating a successful deletion. Returns None if the deletion fails .
Response: Response from Nginx Proxy Manager API
"""
url = f"{self.base_url}/api/nginx/proxy-hosts/{proxy_id}"
headers = {
'Content-Type': 'application/json',
"Authorization": f"Bearer {self.api_token}"
}
response = requests.delete(url, headers=headers)
if response.status_code == 200:
return response
return None
return self.api.delete(path=f"nginx/proxy-hosts/{proxy_id}")

View File

@ -1,3 +1,223 @@
import json
from src.core.apiHelper import APIHelper
from src.core.config import ConfigManager
class PortainerAPI:
def __init__(self) -> None:
pass
"""
This class is used to interact with Portainer API
The Portainer API documentation can be found at: https://app.swaggerhub.com/apis/portainer/portainer-ce/2.19.0
Attributes:
api (APIHelper): API helper
Methods:
get_jwt_token(username: str, password: str) -> Response): Get JWT token
get_endpoints() -> Response: Get endpoints
get_stacks(endpointID: int) -> Response: Get stacks
get_stack_by_id(stackID: int) -> Response: Get stack by ID
remove_stack(stackID: int,endPointID: int) -> Response: Remove a stack
create_stack_standlone_repository(app_name: str, endpointId: int,repositoryURL:str) -> Response: Create a stack from a standalone repository
start_stack(stackID: int, endpointId: int) -> Response: Start a stack
stop_stack(stackID: int, endpointId: int) -> Response: Stop a stack
redeploy_stack(stackID: int, endpointId: int) -> Response: Redeploy a stack
"""
def __init__(self):
"""
Initialize the PortainerAPI instance
"""
self.api = APIHelper(
ConfigManager().get_value("portainer", "base_url"),
{
"Content-Type": "application/json",
},
)
def set_jwt_token(self, jwt_token):
"""
Set JWT token
Args:
jwt_token (str): JWT token
"""
self.api.headers["Authorization"] = f"Bearer {jwt_token}"
def get_jwt_token(self, username: str, password: str):
"""
Get JWT token
Args:
username (str): Username
password (str): Password
Returns:
Response: Response from Portainer API
"""
return self.api.post(
path="auth",
headers={"Content-Type": "application/json"},
json={
"password": password,
"username": username,
},
)
def get_endpoints(self,start: int = 0,limit: int = 1000):
"""
Get endpoints
Returns:
Response: Response from Portainer API
"""
return self.api.get(
path="endpoints",
params={
"start": start,
"limit": limit,
},
)
def get_endpoint_by_id(self, endpointID: int):
"""
Get endpoint by ID
Args:
endpointID (int): Endpoint ID
Returns:
Response: Response from Portainer API
"""
return self.api.get(path=f"endpoints/{endpointID}")
def create_endpoint(self, name: str, EndpointCreationType: int = 1):
"""
Create an endpoint
Args:
name (str): Endpoint name
EndpointCreationType (int, optional): Endpoint creation type:
1 (Local Docker environment), 2 (Agent environment), 3 (Azure environment), 4 (Edge agent environment) or 5 (Local Kubernetes Environment) ,Defaults to 1.
Returns:
Response: Response from Portainer API
"""
return self.api.post(
path="endpoints",
params={"Name": name, "EndpointCreationType": EndpointCreationType},
)
def get_stacks(self, endpointID: int):
"""
Get stacks
Args:
endpointID (int): Endpoint ID
Returns:
Response: Response from Portainer API
"""
return self.api.get(
path="stacks",
params={
"filters": json.dumps(
{"EndpointID": endpointID, "IncludeOrphanedStacks": True}
)
},
)
def get_stack_by_id(self, stackID: int):
"""
Get stack by ID
Args:
stackID (int): Stack ID
Returns:
Response: Response from Portainer API
"""
return self.api.get(path=f"stacks/{stackID}")
def remove_stack(self, stackID: int, endPointID: int):
"""
Remove a stack
Args:
stackID (int): Stack ID
endPointID (int): Endpoint ID
Returns:
Response: Response from Portainer API
"""
return self.api.delete(
path=f"stacks/{stackID}", params={"endpointId": endPointID}
)
def create_stack_standlone_repository(self, stack_name: str, endpointId: int, repositoryURL: str):
"""
Create a stack from a standalone repository
Args:
stack_name (str): Stack name
endpointId (int): Endpoint ID
repositoryURL (str): Repository URL
Returns:
Response: Response from Portainer API
"""
return self.api.post(
path="stacks/create/standalone/repository",
params={"endpointId": endpointId},
json={
"Name": stack_name,
"RepositoryURL": repositoryURL,
"ComposeFile": "docker-compose.yml",
},
)
def start_stack(self, stackID: int, endpointId: int):
"""
Start a stack
Args:
stackID (int): Stack ID
endpointId (int): Endpoint ID
Returns:
Response: Response from Portainer API
"""
return self.api.post(
path=f"stacks/{stackID}/start", params={"endpointId": endpointId}
)
def stop_stack(self, stackID: int, endpointId: int):
"""
Stop a stack
Args:
stackID (int): Stack ID
endpointId (int): Endpoint ID
Returns:
Response: Response from Portainer API
"""
return self.api.post(
path=f"stacks/{stackID}/stop", params={"endpointId": endpointId}
)
def redeploy_stack(self, stackID: int, endpointId: int):
"""
Redeploy a stack
Args:
stackID (int): Stack ID
endpointId (int): Endpoint ID
Returns:
Response: Response from Portainer API
"""
return self.api.post(
path=f"stacks/{stackID}/redeploy", params={"endpointId": endpointId}
)

View File

@ -1,12 +1,47 @@
from fastapi import FastAPI
import json
from fastapi import FastAPI, HTTPException, Request
from fastapi.exceptions import RequestValidationError
from fastapi.responses import JSONResponse
from src.api.v1.routers import app as api_app
from src.api.v1.routers import settings as api_settings
from src.api.v1.routers import proxy as api_proxy
from src.core.exception import CustomException
from src.schemas.errorResponse import ErrorResponse
app = FastAPI(
title="FastAPI Template",
description="FastAPI Template 123",
version="0.0.1"
title="AppManae API",
# summary="[ Base URL: /api/v1 ]",
description="This documentation describes the AppManage API.",
version="0.0.1",
)
# remove 422 responses
@app.on_event("startup")
async def remove_422_responses():
openapi_schema = app.openapi()
for path, path_item in openapi_schema["paths"].items():
for method, operation in path_item.items():
operation["responses"].pop("422", None)
app.openapi_schema = openapi_schema
#custom error handler
@app.exception_handler(CustomException)
async def custom_exception_handler(request, exc: CustomException):
return JSONResponse(
status_code=exc.status_code,
content=ErrorResponse(message=exc.message, details=exc.details).model_dump(),
)
# 422 error handler:set 422 response to 400
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
#errors = {err['loc'][1]: err['msg'] for err in exc.errors()}
errors = ", ".join(f"{err['loc'][1]}: {err['msg']}" for err in exc.errors())
return JSONResponse(
status_code=400,
content=ErrorResponse(message="Request Validation Error", details=errors).model_dump(),
)
app.include_router(api_app.router,tags=["apps"])
app.include_router(api_settings.router,tags=["settings"])
app.include_router(api_proxy.router,tags=["proxys"])

View File

@ -1,24 +1,29 @@
import re
from typing import Optional, List
from pydantic import BaseModel, Field, validator
from src.core.exception import CustomException
class Edition(BaseModel):
dist: str = Field("community", description="The edition of the app",examples=["community","enterprise"])
version: str = Field(..., description="The version of the app",examples=["1.0.0","latest"])
class appInstall(BaseModel):
app_name: str = Field(..., description="The name of the app",examples=["wordpress","mysql"])
app_name: str = Field(...,description="The name of the app",examples=["wordpress","mysql"])
edition: Edition = Field(..., description="The edition of the app", example={"dist":"community","version":"1.0.0"})
app_id: str = Field(..., pattern="^[a-z][a-z0-9]{1,19}$", description="The custom identifier for the application. It must be a combination of 2 to 20 lowercase letters and numbers, and cannot start with a number.", example="wordpress")
app_id: str = Field(...,description="The custom identifier for the application. It must be a combination of 2 to 20 lowercase letters and numbers, and cannot start with a number.", example="wordpress")
domain_names: Optional[List[str]] = Field(None, description="The domain names for the app, not exceeding 2, one wildcard domain and one custom domain.", example=["wordpress.example.com","mysql.example.com"])
default_domain: Optional[str] = Field(None, description="The default domain for the app, sourced from domain_names. If not set, the custom domain will be used automatically.", example="wordpress.example.com")
@validator('app_id', check_fields=False)
def validate_app_id(cls, v):
pattern = re.compile("^[a-z][a-z0-9]{1,19}$")
if not pattern.match(v):
raise CustomException(400,"Invalid Request","The app_id must be a combination of 2 to 20 lowercase letters and numbers, and cannot start with a number.")
return v
@validator('domain_names', check_fields=False)
def validate_domain_names(cls, v):
if v and len(v) > 2:
raise ValueError('domain_names should not exceed 2.')
return v
class Config:
title = "App Installation"
description = "App Installation Payload"
raise CustomException(400, "Invalid Request","The domain_names not exceeding 2")
return v

View File

@ -0,0 +1,5 @@
from pydantic import BaseModel
class ErrorResponse(BaseModel):
message: str
details: str

View File

@ -0,0 +1,108 @@
import json
import os
from src.core.config import ConfigManager
from src.core.exception import CustomException
from src.schemas.appInstall import appInstall
from src.services.gitea_manager import GiteaManager
from src.services.portainer_manager import PortainerManager
from src.core.logger import logger
class AppManger:
def install_app(self,appInstall: appInstall, endpointId: int = None):
portainerManager = PortainerManager()
# if endpointId is None, get the local endpointId
if endpointId is None:
try:
endpointId = portainerManager.get_local_endpoint_id()
except CustomException:
raise
except Exception:
raise CustomException()
else :
# validate the endpointId is exists
is_endpointId_exists = portainerManager.check_endpoint_exists(endpointId)
if not is_endpointId_exists:
raise CustomException(
status_code=404,
message="Not found",
details="EndpointId Not Found"
)
# validate the app_name and app_version
app_name = appInstall.app_name
app_version = appInstall.edition.version
self._check_appName_and_appVersion(app_name,app_version)
# validate the app_id
app_id = appInstall.app_id
self._check_appId(app_id,endpointId)
# validate the domain_names
def _check_appName_and_appVersion(self,app_name:str, app_version:str):
"""
Check the app_name and app_version is exists in docker library
Args:
app_name (str): App Name
app_version (str): App Version
Raises:
CustomException: If the app_name or app_version is not exists in docker library
"""
library_path = ConfigManager().get_value("docker_library", "path")
if not os.path.exists(f"{library_path}/{app_name}"):
logger.error(f"When install app:{app_name}, the app is not exists in docker library")
raise CustomException(
status_code=400,
message="App Name Not Supported",
details=f"app_name:{app_name} not supported",
)
else:
with open(f"{library_path}/{app_name}/variables.json", "r") as f:
variables = json.load(f)
community_editions = [d for d in variables["edition"] if d["dist"] == "community"]
if not any(
app_version in d["version"] for d in community_editions
):
logger.error(f"When install app:{app_name}, the app version:{app_version} is not exists in docker library")
raise CustomException(
status_code=400,
message="App Version Not Supported",
details=f"app_version:{app_version} not supported",
)
def _check_appId(self,app_id:str,endpointId:int):
# validate the app_id is exists in gitea
giteaManager = GiteaManager()
is_repo_exists = giteaManager.check_repo_exists(app_id)
if is_repo_exists:
logger.error(f"When install app,the app_id:{{app_id}} is exists in gitea")
raise CustomException(
status_code=400,
message="App_id Conflict",
details=f"App_id:{app_id} Is Exists In Gitea"
)
# validate the app_id is exists in portainer
portainerManager = PortainerManager()
is_stack_exists = portainerManager.check_stack_exists(app_id,endpointId)
if is_stack_exists:
logger.error(f"When install app, the app_id:{app_id} is exists in portainer")
raise CustomException(
status_code=400,
message="App_id Conflict",
details=f"app_id:{app_id} is exists in portainer"
)

View File

@ -0,0 +1,109 @@
import base64
import os
from git import Repo
from src.core.logger import logger
from src.core.config import ConfigManager
from src.core.exception import CustomException
from src.external.gitea_api import GiteaAPI
class GiteaManager:
def __init__(self):
try:
self.gitea = GiteaAPI()
self._set_basic_auth_credential()
except Exception as e:
logger.error(f"Init Gitea API Error:{e}")
raise CustomException()
def _set_basic_auth_credential(self):
"""
Set basic auth credential
"""
username = ConfigManager().get_value("gitea", "user_name")
password = ConfigManager().get_value("gitea", "user_pwd")
credentials = f"{username}:{password}"
credentials_encoded = base64.b64encode(credentials.encode()).decode()
self.gitea.set_credential(credentials_encoded)
def check_repo_exists(self,repo_name: str):
"""
Check repo is exist
Args:
repo_name (str): Repository name
Returns:
bool: True if repo is exist, False if repo is not exist, raise exception if error
"""
response = self.gitea.get_repo_by_name(repo_name)
if response.status_code == 200:
return True
elif response.status_code == 404:
return False
else:
logger.error(f"Error validate repo is exist from gitea: {response.text}")
raise CustomException()
def create_repo(self, repo_name: str):
"""
Create repository
Args:
repo_name (str): Repository name
Returns:
bool: True if repo is created, raise exception if repo is not created
"""
response = self.gitea.create_repo(repo_name)
if response.status_code == 201:
return True
else:
logger.error(f"Error create repo from gitea: {response.text}")
raise CustomException()
def create_local_repo_and_push_remote(self, local_git_path: str,remote_git_url: str):
if os.path.exists(local_git_path):
try:
repo = Repo.init(local_git_path)
repo.create_head('main')
repo.git.add(A=True)
repo.index.commit("Initial commit")
origin = repo.create_remote('origin',remote_git_url)
origin.push(refspec='main:main')
except Exception as e:
logger.error(f"Error create local repo and push remote: {e}")
raise CustomException()
else:
logger.error(f"Error repo path not exist: {local_git_path}")
raise CustomException()
def get_file_content_from_repo(self, repo_name: str, file_path: str):
response = self.gitea.get_file_content_from_repo(repo_name, file_path)
if response.status_code == 200:
return {
"name": response.json()["name"],
"encoding": response.json()["encoding"],
"sha": response.json()["sha"],
"content": response.json()["content"],
}
else:
logger.error(f"Error get file content from repo from gitea: {response.text}")
raise CustomException()
def update_file_in_repo(self, repo_name: str, file_path: str, content: str,sha: str):
response = self.gitea.update_file_content_in_repo(repo_name, file_path, content, sha)
if response.status_code == 201:
return True
else:
logger.error(f"Error update file in repo from gitea: {response.text}")
raise CustomException()
def remove_repo(self, repo_name: str):
response = self.gitea.remove_repo(repo_name)
if response.status_code == 204:
return True
else:
logger.error(f"Error remove repo from gitea: {response.text}")
raise CustomException()

View File

@ -1,23 +1,137 @@
from fastapi import logger
import json
import time
import jwt
import keyring
from src.core.config import ConfigManager
from src.core.exception import CustomException
from src.external.portainer_api import PortainerAPI
from src.core.logger import logger
class PortainerManager:
def __init__(self, portainer_url, portainer_username, portainer_password):
"""
Init Portainer Manager
Args:
portainer_url (str): The url of the portainer
portainer_username (str): The username of the portainer
portainer_password (str): The password of the portainer
"""
self.portainer_url = portainer_url
self.portainer_username = portainer_username
self.portainer_password = portainer_password
def __init__(self):
try:
self.portainer = PortainerAPI(self.portainer_url)
self._init_portainer_token()
self.portainer = PortainerAPI()
self._set_portainer_token()
except Exception as e:
logger.error(f"Init Portainer API Error:{e}")
raise e
raise CustomException()
def _set_portainer_token(self):
service_name = "portainer"
token_name = "user_token"
# Try to get token from keyring
try:
jwt_token = keyring.get_password(service_name, token_name)
except Exception as e:
jwt_token = None
# if the token is got from keyring,vaildate the exp time
if jwt_token is not None:
try:
decoded_jwt = jwt.decode(jwt_token, options={"verify_signature": False})
exp_timestamp = decoded_jwt['exp']
# if the token is not expired, return it
if int(exp_timestamp) - int(time.time()) > 3600:
self.portainer.set_jwt_token(jwt_token)
return
except Exception as e:
logger.error(f"Decode Portainer's Token Error:{e}")
raise CustomException()
# if the token is expired or not got from keyring, get a new one
try:
userName = ConfigManager().get_value("portainer", "user_name")
userPwd = ConfigManager().get_value("portainer", "user_pwd")
except Exception as e:
logger.error(f"Get Portainer's UserName and UserPwd Error:{e}")
raise CustomException()
token_response = self.portainer.get_jwt_token(userName, userPwd)
if token_response.status_code == 200:
jwt_token = token_response.json()["jwt"]
self.portainer.set_jwt_token(jwt_token)
# set new token to keyring
try:
keyring.set_password(service_name, token_name, jwt_token)
except Exception as e:
logger.error(f"Set Portainer's Token To Keyring Error:{e}")
raise CustomException()
else:
logger.error(f"Error Calling Portainer API: {token_response.status_code}:{token_response.text}")
raise CustomException()
def get_local_endpoint_id(self):
"""
Get local endpoint id: the endpoint id of the local docker engine
if there are multiple local endpoints, return the one with the smallest id
Returns:
str: local endpoint id
"""
response = self.portainer.get_endpoints()
if response.status_code == 200:
endpoints = response.json()
local_endpoint = None
for endpoint in endpoints:
if endpoint["URL"] == "unix:///var/run/docker.sock":
if local_endpoint is None:
local_endpoint = endpoint
elif endpoint["Id"] < local_endpoint["Id"]:
local_endpoint = endpoint
if local_endpoint is not None:
return local_endpoint["Id"]
else:
logger.error(f"Error get local endpoint id from portainer: {response.text}")
raise CustomException()
else:
logger.error(f"Error get local endpoint id from portainer: {response.text}")
raise CustomException()
def check_endpoint_exists(self, endpoint_id: str):
response = self.portainer.get_endpoint_by_id(endpoint_id)
if response.status_code == 200:
return True
elif response.status_code == 404:
return False
else:
logger.error(f"Error validate endpoint is exist from portainer: {response.text}")
raise CustomException()
def check_stack_exists(self, stack_name: str, endpoint_id: str):
response = self.portainer.get_stacks(endpoint_id)
if response.status_code == 200:
stacks = response.json()
for stack in stacks:
if stack["Name"] == stack_name:
return True
return False
else:
logger.error(f"Error validate stack is exist from portainer: {response.text}")
raise CustomException()
def create_stack_from_repository(self, stack_name: str, endpoint_id: str,repositoryURL : str):
response = self.portainer.create_stack_standlone_repository(stack_name, endpoint_id,repositoryURL)
if response.status_code == 200:
return True
else:
logger.error(f"Error create stack from portainer: {response.text}")
raise CustomException()
def get_stacks(self, endpoint_id: str):
response = self.portainer.get_stacks(endpoint_id)
if response.status_code == 200:
return response.json()
else:
logger.error(f"Error get stacks from portainer: {response.text}")
raise CustomException()
def get_stack_by_id(self, stack_id: str):
response = self.portainer.get_stack_by_id(stack_id)
if response.status_code == 200:
return response.json()
else:
logger.error(f"Error get stack by id from portainer: {response.text}")
raise CustomException()

View File

@ -1,7 +1,8 @@
import time
import keyring
import json
from src.core.config import ConfigManager
from src.core.exception import CustomException
from src.core.logger import logger
from src.external.nginx_proxy_manager_api import NginxProxyManagerAPI
@ -9,70 +10,95 @@ from src.external.nginx_proxy_manager_api import NginxProxyManagerAPI
class ProxyManager:
def __init__(self, app_name):
"""
Init Domain Manager
Initialize the ProxyManager instance.
Args:
app_name (str): The name of the app
"""
self.app_name = app_name
try:
self.nginx = NginxProxyManagerAPI()
self._init_nginx_token()
self._set_nginx_token()
except Exception as e:
logger.error(f"Init Nginx Proxy Manager API Error:{e}")
raise e
raise CustomException()
def _init_nginx_token(self):
def _set_nginx_token(self):
"""
Get Nginx Proxy Manager's Token From Keyring, if the token is expired or not got from keyring, get a new one and set it to keyring
"""
service_name = 'nginx_proxy_manager'
token_name = "nginx_token"
service_name = "nginx_proxy_manager"
token_name = "user_token"
# Try to get token from keyring
try:
token_json_str = keyring.get_password(service_name, token_name)
except Exception as e:
logger.error(f"Get Nginx Proxy Manager's Token From Keyring Error:{e}")
token_json_str = None
# if the token is got from keyring, parse it
if token_json_str is not None:
token_json = json.loads(token_json_str)
expires = token_json.get("expires")
api_token = token_json.get("token")
try:
token_json = json.loads(token_json_str)
expires = token_json.get("expires")
api_token = token_json.get("token")
# if the token is not expired, return it
if int(expires) - int(time.time()) > 3600:
self.nginx.api_token = api_token
return
# if the token is not expired, return it
if int(expires) - int(time.time()) > 3600:
self.nginx.set_token(api_token)
return
except Exception as e:
logger.error(f"Parse Nginx Proxy Manager's Token Error:{e}")
raise CustomException()
# if the token is expired or not got from keyring, get a new one
try:
nginx_tokens = self.nginx.get_token("userName","userPwd")
userName = ConfigManager().get_value("nginx_proxy_manager", "user_name")
userPwd = ConfigManager().get_value("nginx_proxy_manager", "user_pwd")
except Exception as e:
logger.error(f"Get Nginx Proxy Manager's Token Error:{e}")
return
logger.error(f"Get Nginx Proxy Manager's UserName and UserPwd Error:{e}")
raise CustomException()
nginx_tokens = self.nginx.get_token(userName, userPwd)
if nginx_tokens.status_code == 200:
nginx_tokens = nginx_tokens.json()
expires = nginx_tokens.get("expires")
api_token = nginx_tokens.get("token")
expires = nginx_tokens.get("expires")
api_token = nginx_tokens.get("token")
self.nginx.set_token(api_token)
self.nginx.api_token = api_token
token_json = {"expires": expires, "token": api_token}
token_json = {
"expires": expires,
"token": api_token
}
# set new token to keyring
try:
keyring.set_password(service_name, token_name, json.dumps(token_json))
except Exception as e:
logger.error(f"Set Nginx Proxy Manager's Token To Keyring Error:{e}")
raise CustomException()
else:
raise CustomException()
def check_proxy_host_exists(self,domain_names: list[str]):
response = self.nginx.get_proxy_hosts()
if response.status_code == 200:
proxy_hosts = response.json()
for proxy_host in proxy_hosts:
if proxy_host["domain_names"] == domain_names:
return True
return False
else:
raise CustomException()
# set new token to keyring
def create_proxy_for_app(self,domain_names: list[str],forward_host: str,forward_port: int,advanced_config: str = "",forward_scheme: str = "http"):
try:
keyring.set_password(service_name, token_name, json.dumps(token_json))
except Exception as e:
logger.error(f"Set Nginx Proxy Manager's Token To Keyring Error:{e}")
return
def create_proxy_for_app(self, domain_names:list[str],forward_port:int,advanced_config:str="",forward_scheme:str="http"):
try:
self.nginx.create_proxy_host(domain_names=domain_names,forward_scheme=forward_scheme,forward_port=forward_port,advanced_config=advanced_config)
self.nginx.create_proxy_host(
domain_names=domain_names,
forward_scheme=forward_scheme,
forward_host=forward_host,
forward_port=forward_port,
advanced_config=advanced_config,
)
except Exception as e:
logger.error(f"Create Proxy Host For {self.app_name} Error {e}")
raise e
raise e

View File

@ -0,0 +1,32 @@
import string
import random
def generate_strong_password():
lowercase_letters = string.ascii_lowercase # all lowercase letters
uppercase_letters = string.ascii_uppercase # all uppercase letters
digits = string.digits # all digits
special_symbols = "`$%()[]{},.*+-:;<>?_~/|\"" # all special symbols
# get 4 random characters from each category
password = [
random.choice(lowercase_letters),
random.choice(uppercase_letters),
random.choice(digits),
random.choice(special_symbols)
]
# get 12 random characters from all categories
all_characters = lowercase_letters + uppercase_letters + digits + special_symbols
for i in range(12): # 12 characters
password.append(random.choice(all_characters)) # get a random character from all characters
# shuffle the password list
random.shuffle(password)
# convert the list to a string
password = ''.join(password)
return password
if __name__ == "__main__":
print(generate_strong_password())