From dd634e6921fc34b0c71c962dfcba3033b8aca3fa Mon Sep 17 00:00:00 2001 From: aceld Date: Wed, 3 Apr 2024 12:36:34 +0800 Subject: [PATCH] change CaaS Function Type, add return value:interface{} --- conn/kis_connector.go | 12 ++++++++---- kis/connector.go | 2 +- kis/pool.go | 4 ++-- kis/router.go | 2 +- test/caas/caas_demo1.go | 4 ++-- test/faas/faas_demo2.go | 2 +- 6 files changed, 15 insertions(+), 11 deletions(-) diff --git a/conn/kis_connector.go b/conn/kis_connector.go index 4b22897..9ce1baa 100644 --- a/conn/kis_connector.go +++ b/conn/kis_connector.go @@ -50,12 +50,16 @@ func (conn *KisConnector) Init() error { } // Call 调用Connector 外挂存储逻辑的读写操作 -func (conn *KisConnector) Call(ctx context.Context, flow kis.Flow, args interface{}) error { - if err := kis.Pool().CallConnector(ctx, flow, conn, args); err != nil { - return err +func (conn *KisConnector) Call(ctx context.Context, flow kis.Flow, args interface{}) (interface{}, error) { + var result interface{} + var err error + + result, err = kis.Pool().CallConnector(ctx, flow, conn, args) + if err != nil { + return nil, err } - return nil + return result, nil } func (conn *KisConnector) GetName() string { diff --git a/kis/connector.go b/kis/connector.go index f0829e9..fd781f5 100644 --- a/kis/connector.go +++ b/kis/connector.go @@ -9,7 +9,7 @@ type Connector interface { // Init 初始化Connector所关联的存储引擎链接等 Init() error // Call 调用Connector 外挂存储逻辑的读写操作 - Call(ctx context.Context, flow Flow, args interface{}) error + Call(ctx context.Context, flow Flow, args interface{}) (interface{}, error) // GetId 获取Connector的ID GetId() string // GetName 获取Connector的名称 diff --git a/kis/pool.go b/kis/pool.go index cdd5658..3607b80 100644 --- a/kis/pool.go +++ b/kis/pool.go @@ -212,7 +212,7 @@ func (pool *kisPool) CaaS(cname string, fname string, mode common.KisMode, c Caa } // CallConnector 调度 Connector -func (pool *kisPool) CallConnector(ctx context.Context, flow Flow, conn Connector, args interface{}) error { +func (pool *kisPool) CallConnector(ctx context.Context, flow Flow, conn Connector, args interface{}) (interface{}, error) { pool.cLock.RLock() // 读锁 defer pool.cLock.RUnlock() fn := flow.GetThisFunction() @@ -225,7 +225,7 @@ func (pool *kisPool) CallConnector(ctx context.Context, flow Flow, conn Connecto log.Logger().ErrorFX(ctx, "CName:%s FName:%s mode:%s Can not find in KisPool, Not Added.\n", conn.GetName(), fnConf.FName, mode) - return errors.New(fmt.Sprintf("CName:%s FName:%s mode:%s Can not find in KisPool, Not Added.", conn.GetName(), fnConf.FName, mode)) + return nil, errors.New(fmt.Sprintf("CName:%s FName:%s mode:%s Can not find in KisPool, Not Added.", conn.GetName(), fnConf.FName, mode)) } // GetFlows 得到全部的Flow diff --git a/kis/router.go b/kis/router.go index 8d12896..03d4d61 100644 --- a/kis/router.go +++ b/kis/router.go @@ -34,7 +34,7 @@ type connInitRouter map[string]ConnInit Connector Call */ // CaaS Connector的存储读取业务实现 -type CaaS func(context.Context, Connector, Function, Flow, interface{}) error +type CaaS func(context.Context, Connector, Function, Flow, interface{}) (interface{}, error) // connFuncRouter 通过FunctionName索引到CaaS回调存储业务的映射关系 // key: Function Name diff --git a/test/caas/caas_demo1.go b/test/caas/caas_demo1.go index 006e827..4abdcae 100644 --- a/test/caas/caas_demo1.go +++ b/test/caas/caas_demo1.go @@ -8,7 +8,7 @@ import ( // type CaaS func(context.Context, Connector, Function, Flow, interface{}) error -func CaasDemoHanler1(ctx context.Context, conn kis.Connector, fn kis.Function, flow kis.Flow, args interface{}) error { +func CaasDemoHanler1(ctx context.Context, conn kis.Connector, fn kis.Function, flow kis.Flow, args interface{}) (interface{}, error) { fmt.Printf("===> In CaasDemoHanler1: flowName: %s, cName:%s, fnName:%s, mode:%s\n", flow.GetName(), conn.GetName(), fn.GetConfig().FName, fn.GetConfig().FMode) @@ -16,5 +16,5 @@ func CaasDemoHanler1(ctx context.Context, conn kis.Connector, fn kis.Function, f fmt.Printf("===> Call Connector CaasDemoHanler1, args from funciton: %s\n", args) - return nil + return nil, nil } diff --git a/test/faas/faas_demo2.go b/test/faas/faas_demo2.go index 305a026..e050e81 100644 --- a/test/faas/faas_demo2.go +++ b/test/faas/faas_demo2.go @@ -23,7 +23,7 @@ func FuncDemo2Handler(ctx context.Context, flow kis.Flow) error { return err } - if conn.Call(ctx, flow, row) != nil { + if _, err := conn.Call(ctx, flow, row); err != nil { log.Logger().ErrorFX(ctx, "FuncDemo2Handler(): Call err = %s\n", err.Error()) return err }