mirror of
https://github.com/aceld/kis-flow.git
synced 2025-01-23 07:30:23 +08:00
92 lines
1.9 KiB
Go
92 lines
1.9 KiB
Go
|
package kis
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"errors"
|
||
|
"fmt"
|
||
|
"kis-flow/log"
|
||
|
"sync"
|
||
|
)
|
||
|
|
||
|
var _poolOnce sync.Once
|
||
|
|
||
|
// kisPool 用于管理全部的Function和Flow配置的池子
|
||
|
type kisPool struct {
|
||
|
fnRouter funcRouter // 全部的Function管理路由
|
||
|
fnLock sync.RWMutex // fnRouter 锁
|
||
|
|
||
|
flowRouter flowRouter // 全部的flow对象
|
||
|
flowLock sync.RWMutex // flowRouter 锁
|
||
|
}
|
||
|
|
||
|
// 单例
|
||
|
var _pool *kisPool
|
||
|
|
||
|
// Pool 单例构造
|
||
|
func Pool() *kisPool {
|
||
|
_poolOnce.Do(func() {
|
||
|
//创建kisPool对象
|
||
|
_pool = new(kisPool)
|
||
|
|
||
|
// fnRouter初始化
|
||
|
_pool.fnRouter = make(funcRouter)
|
||
|
|
||
|
// flowRouter初始化
|
||
|
_pool.flowRouter = make(flowRouter)
|
||
|
})
|
||
|
|
||
|
return _pool
|
||
|
}
|
||
|
|
||
|
func (pool *kisPool) AddFlow(name string, flow Flow) {
|
||
|
pool.flowLock.Lock()
|
||
|
defer pool.flowLock.Unlock()
|
||
|
|
||
|
if _, ok := pool.flowRouter[name]; !ok {
|
||
|
pool.flowRouter[name] = flow
|
||
|
} else {
|
||
|
errString := fmt.Sprintf("Pool AddFlow Repeat FlowName=%s\n", name)
|
||
|
panic(errString)
|
||
|
}
|
||
|
|
||
|
log.Logger().InfoF("Add FlowRouter FlowName=%s\n", name)
|
||
|
}
|
||
|
|
||
|
func (pool *kisPool) GetFlow(name string) Flow {
|
||
|
pool.flowLock.RLock()
|
||
|
defer pool.flowLock.RUnlock()
|
||
|
|
||
|
if flow, ok := pool.flowRouter[name]; ok {
|
||
|
return flow
|
||
|
} else {
|
||
|
return nil
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// FaaS 注册 Function 计算业务逻辑, 通过Function Name 索引及注册
|
||
|
func (pool *kisPool) FaaS(fnName string, f FaaS) {
|
||
|
pool.fnLock.Lock()
|
||
|
defer pool.fnLock.Unlock()
|
||
|
|
||
|
if _, ok := pool.fnRouter[fnName]; !ok {
|
||
|
pool.fnRouter[fnName] = f
|
||
|
} else {
|
||
|
errString := fmt.Sprintf("KisPoll FaaS Repeat FuncName=%s", fnName)
|
||
|
panic(errString)
|
||
|
}
|
||
|
|
||
|
log.Logger().InfoF("Add KisPool FuncName=%s", fnName)
|
||
|
}
|
||
|
|
||
|
// CallFunction 调度 Function
|
||
|
func (pool *kisPool) CallFunction(ctx context.Context, fnName string, flow Flow) error {
|
||
|
|
||
|
if f, ok := pool.fnRouter[fnName]; ok {
|
||
|
return f(ctx, flow)
|
||
|
}
|
||
|
|
||
|
log.Logger().ErrorFX(ctx, "FuncName: %s Can not find in KisPool, Not Added.\n", fnName)
|
||
|
|
||
|
return errors.New("FuncName: " + fnName + " Can not find in NsPool, Not Added.")
|
||
|
}
|