mirror of
https://github.com/aceld/kis-flow.git
synced 2025-01-23 07:30:23 +08:00
fix: rename log interface
This commit is contained in:
parent
8e2f22fe52
commit
d7d1383a94
@ -46,7 +46,7 @@ func NewFuncConfig(
|
|||||||
Name: "unNamedSource",
|
Name: "unNamedSource",
|
||||||
}
|
}
|
||||||
source = &defaultSource
|
source = &defaultSource
|
||||||
log.Logger().InfoF("funcName NewConfig source is nil, use default unNamed Source.", "funcName", funcName)
|
log.Logger().Info("funcName NewConfig source is nil, use default unNamed Source.", "funcName", funcName)
|
||||||
}
|
}
|
||||||
config.Source = *source
|
config.Source = *source
|
||||||
|
|
||||||
@ -56,10 +56,10 @@ func NewFuncConfig(
|
|||||||
// FunctionS 和 L 需要必传KisConnector参数,原因是S和L需要通过Connector进行建立流式关系
|
// FunctionS 和 L 需要必传KisConnector参数,原因是S和L需要通过Connector进行建立流式关系
|
||||||
if mode == common.S || mode == common.L {
|
if mode == common.S || mode == common.L {
|
||||||
if option == nil {
|
if option == nil {
|
||||||
log.Logger().ErrorF("Funcion S/L need option->Cid\n")
|
log.Logger().Error("Funcion S/L need option->Cid\n")
|
||||||
return nil
|
return nil
|
||||||
} else if option.CName == "" {
|
} else if option.CName == "" {
|
||||||
log.Logger().ErrorF("Funcion S/L need option->Cid\n")
|
log.Logger().Error("Funcion S/L need option->Cid\n")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -97,8 +97,8 @@ func (flow *KisFlow) Fork(ctx context.Context) kis.Flow {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Logger().DebugFX(ctx, "=====>Flow Fork, ", "oldFlow.funcParams", flow.funcParams)
|
log.Logger().DebugX(ctx, "=====>Flow Fork, ", "oldFlow.funcParams", flow.funcParams)
|
||||||
log.Logger().DebugFX(ctx, "=====>Flow Fork, ", "newFlow.funcParams", newFlow.GetFuncParamsAllFuncs())
|
log.Logger().DebugX(ctx, "=====>Flow Fork, ", "newFlow.funcParams", newFlow.GetFuncParamsAllFuncs())
|
||||||
|
|
||||||
return newFlow
|
return newFlow
|
||||||
}
|
}
|
||||||
@ -257,7 +257,7 @@ func (flow *KisFlow) Run(ctx context.Context) error {
|
|||||||
|
|
||||||
// 得到当前Function要处理与的源数据
|
// 得到当前Function要处理与的源数据
|
||||||
if inputData, err := flow.getCurData(); err != nil {
|
if inputData, err := flow.getCurData(); err != nil {
|
||||||
log.Logger().ErrorFX(ctx, "flow.Run(): getCurData", "err", err.Error())
|
log.Logger().ErrorX(ctx, "flow.Run(): getCurData", "err", err.Error())
|
||||||
return err
|
return err
|
||||||
} else {
|
} else {
|
||||||
flow.inPut = inputData
|
flow.inPut = inputData
|
||||||
@ -346,7 +346,7 @@ func (flow *KisFlow) GetFuncConfigByName(funcName string) *config.KisFuncConfig
|
|||||||
if f, ok := flow.Funcs[funcName]; ok {
|
if f, ok := flow.Funcs[funcName]; ok {
|
||||||
return f.GetConfig()
|
return f.GetConfig()
|
||||||
} else {
|
} else {
|
||||||
log.Logger().ErrorF("GetFuncConfigByName(): Function not found", "FunctionName", funcName)
|
log.Logger().Error("GetFuncConfigByName(): Function not found", "FunctionName", funcName)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -73,7 +73,7 @@ func (flow *KisFlow) commitSrcData(ctx context.Context) error {
|
|||||||
metrics.Metrics.FlowDataTotal.WithLabelValues(flow.Name).Add(float64(dataCnt))
|
metrics.Metrics.FlowDataTotal.WithLabelValues(flow.Name).Add(float64(dataCnt))
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Logger().DebugFX(ctx, "====> After CommitSrcData", "flow_name", flow.Name, "flow_id", flow.Id, "All Level Data", flow.data)
|
log.Logger().DebugX(ctx, "====> After CommitSrcData", "flow_name", flow.Name, "flow_id", flow.Id, "All Level Data", flow.data)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -106,7 +106,7 @@ func (flow *KisFlow) commitReuseData(ctx context.Context) error {
|
|||||||
// 清空缓冲Buf (如果是ReuseData选项,那么提交的全部数据,都将不会携带到下一层)
|
// 清空缓冲Buf (如果是ReuseData选项,那么提交的全部数据,都将不会携带到下一层)
|
||||||
flow.buffer = flow.buffer[0:0]
|
flow.buffer = flow.buffer[0:0]
|
||||||
|
|
||||||
log.Logger().DebugFX(ctx, " ====> After commitReuseData", "flow_name", flow.Name, "flow_id", flow.Id, "All Level Data", flow.data)
|
log.Logger().DebugX(ctx, " ====> After commitReuseData", "flow_name", flow.Name, "flow_id", flow.Id, "All Level Data", flow.data)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -122,7 +122,7 @@ func (flow *KisFlow) commitVoidData(ctx context.Context) error {
|
|||||||
// 将本层计算的缓冲数据提交到本层结果数据中
|
// 将本层计算的缓冲数据提交到本层结果数据中
|
||||||
flow.data[flow.ThisFunctionId] = batch
|
flow.data[flow.ThisFunctionId] = batch
|
||||||
|
|
||||||
log.Logger().DebugFX(ctx, " ====> After commitVoidData", "flow_name", flow.Name, "flow_id", flow.Id, "All Level Data", flow.data)
|
log.Logger().DebugX(ctx, " ====> After commitVoidData", "flow_name", flow.Name, "flow_id", flow.Id, "All Level Data", flow.data)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -150,7 +150,7 @@ func (flow *KisFlow) commitCurData(ctx context.Context) error {
|
|||||||
// 清空缓冲Buf
|
// 清空缓冲Buf
|
||||||
flow.buffer = flow.buffer[0:0]
|
flow.buffer = flow.buffer[0:0]
|
||||||
|
|
||||||
log.Logger().DebugFX(ctx, " ====> After commitCurData", "flow_name", flow.Name, "flow_id", flow.Id, "All Level Data", flow.data)
|
log.Logger().DebugX(ctx, " ====> After commitCurData", "flow_name", flow.Name, "flow_id", flow.Id, "All Level Data", flow.data)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -20,11 +20,11 @@ func NewKisFunctionC() kis.Function {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (f *KisFunctionC) Call(ctx context.Context, flow kis.Flow) error {
|
func (f *KisFunctionC) Call(ctx context.Context, flow kis.Flow) error {
|
||||||
log.Logger().DebugF("KisFunctionC", "flow", flow)
|
log.Logger().Debug("KisFunctionC", "flow", flow)
|
||||||
|
|
||||||
// 通过KisPool 路由到具体的执行计算Function中
|
// 通过KisPool 路由到具体的执行计算Function中
|
||||||
if err := kis.Pool().CallFunction(ctx, f.Config.FName, flow); err != nil {
|
if err := kis.Pool().CallFunction(ctx, f.Config.FName, flow); err != nil {
|
||||||
log.Logger().ErrorFX(ctx, "Function Called Error", "err", err)
|
log.Logger().ErrorX(ctx, "Function Called Error", "err", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -20,11 +20,11 @@ func NewKisFunctionE() kis.Function {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (f *KisFunctionE) Call(ctx context.Context, flow kis.Flow) error {
|
func (f *KisFunctionE) Call(ctx context.Context, flow kis.Flow) error {
|
||||||
log.Logger().DebugF("KisFunctionE", "flow", flow)
|
log.Logger().Debug("KisFunctionE", "flow", flow)
|
||||||
|
|
||||||
// 通过KisPool 路由到具体的执行计算Function中
|
// 通过KisPool 路由到具体的执行计算Function中
|
||||||
if err := kis.Pool().CallFunction(ctx, f.Config.FName, flow); err != nil {
|
if err := kis.Pool().CallFunction(ctx, f.Config.FName, flow); err != nil {
|
||||||
log.Logger().ErrorFX(ctx, "Function Called Error", "err", err)
|
log.Logger().ErrorX(ctx, "Function Called Error", "err", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -20,11 +20,11 @@ func NewKisFunctionL() kis.Function {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (f *KisFunctionL) Call(ctx context.Context, flow kis.Flow) error {
|
func (f *KisFunctionL) Call(ctx context.Context, flow kis.Flow) error {
|
||||||
log.Logger().DebugF("KisFunctionL", "flow", flow)
|
log.Logger().Debug("KisFunctionL", "flow", flow)
|
||||||
|
|
||||||
// 通过KisPool 路由到具体的执行计算Function中
|
// 通过KisPool 路由到具体的执行计算Function中
|
||||||
if err := kis.Pool().CallFunction(ctx, f.Config.FName, flow); err != nil {
|
if err := kis.Pool().CallFunction(ctx, f.Config.FName, flow); err != nil {
|
||||||
log.Logger().ErrorFX(ctx, "Function Called Error", "err", err)
|
log.Logger().ErrorX(ctx, "Function Called Error", "err", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -20,11 +20,11 @@ func NewKisFunctionS() kis.Function {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (f *KisFunctionS) Call(ctx context.Context, flow kis.Flow) error {
|
func (f *KisFunctionS) Call(ctx context.Context, flow kis.Flow) error {
|
||||||
log.Logger().DebugF("KisFunctionS", "flow", flow)
|
log.Logger().Debug("KisFunctionS", "flow", flow)
|
||||||
|
|
||||||
// 通过KisPool 路由到具体的执行计算Function中
|
// 通过KisPool 路由到具体的执行计算Function中
|
||||||
if err := kis.Pool().CallFunction(ctx, f.Config.FName, flow); err != nil {
|
if err := kis.Pool().CallFunction(ctx, f.Config.FName, flow); err != nil {
|
||||||
log.Logger().ErrorFX(ctx, "Function Called Error", "err", err)
|
log.Logger().ErrorX(ctx, "Function Called Error", "err", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -20,11 +20,11 @@ func NewKisFunctionV() kis.Function {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (f *KisFunctionV) Call(ctx context.Context, flow kis.Flow) error {
|
func (f *KisFunctionV) Call(ctx context.Context, flow kis.Flow) error {
|
||||||
log.Logger().DebugF("KisFunctionV", "flow", flow)
|
log.Logger().Debug("KisFunctionV", "flow", flow)
|
||||||
|
|
||||||
// 通过KisPool 路由到具体的执行计算Function中
|
// 通过KisPool 路由到具体的执行计算Function中
|
||||||
if err := kis.Pool().CallFunction(ctx, f.Config.FName, flow); err != nil {
|
if err := kis.Pool().CallFunction(ctx, f.Config.FName, flow); err != nil {
|
||||||
log.Logger().ErrorFX(ctx, "Function Called Error", "err", err)
|
log.Logger().ErrorX(ctx, "Function Called Error", "err", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
14
kis/pool.go
14
kis/pool.go
@ -61,7 +61,7 @@ func (pool *kisPool) AddFlow(name string, flow Flow) {
|
|||||||
panic(errString)
|
panic(errString)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Logger().InfoF("Add FlowRouter", "FlowName", name)
|
log.Logger().Info("Add FlowRouter", "FlowName", name)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pool *kisPool) GetFlow(name string) Flow {
|
func (pool *kisPool) GetFlow(name string) Flow {
|
||||||
@ -95,7 +95,7 @@ func (pool *kisPool) FaaS(fnName string, f FaaS) {
|
|||||||
panic(errString)
|
panic(errString)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Logger().InfoF("Add KisPool", "FuncName", fnName)
|
log.Logger().Info("Add KisPool", "FuncName", fnName)
|
||||||
}
|
}
|
||||||
|
|
||||||
// CallFunction 调度 Function
|
// CallFunction 调度 Function
|
||||||
@ -127,7 +127,7 @@ func (pool *kisPool) CallFunction(ctx context.Context, fnName string, flow Flow)
|
|||||||
// 将flow.Input()中的原始数据,反序列化为argType类型的数据
|
// 将flow.Input()中的原始数据,反序列化为argType类型的数据
|
||||||
value, err := funcDesc.Serialize.UnMarshal(flow.Input(), argType)
|
value, err := funcDesc.Serialize.UnMarshal(flow.Input(), argType)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Logger().ErrorFX(ctx, "funcDesc.Serialize.DecodeParam", "err", err)
|
log.Logger().ErrorX(ctx, "funcDesc.Serialize.DecodeParam", "err", err)
|
||||||
} else {
|
} else {
|
||||||
params = append(params, value)
|
params = append(params, value)
|
||||||
continue
|
continue
|
||||||
@ -153,7 +153,7 @@ func (pool *kisPool) CallFunction(ctx context.Context, fnName string, flow Flow)
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Logger().ErrorFX(ctx, "FuncName: Can not find in KisPool, Not Added.", "FuncName", fnName)
|
log.Logger().ErrorX(ctx, "FuncName: Can not find in KisPool, Not Added.", "FuncName", fnName)
|
||||||
|
|
||||||
return errors.New("FuncName: " + fnName + " Can not find in NsPool, Not Added.")
|
return errors.New("FuncName: " + fnName + " Can not find in NsPool, Not Added.")
|
||||||
}
|
}
|
||||||
@ -170,7 +170,7 @@ func (pool *kisPool) CaaSInit(cname string, c ConnInit) {
|
|||||||
panic(errString)
|
panic(errString)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Logger().InfoF("Add KisPool CaaSInit", "CName", cname)
|
log.Logger().Info("Add KisPool CaaSInit", "CName", cname)
|
||||||
}
|
}
|
||||||
|
|
||||||
// CallConnInit 调度 ConnInit
|
// CallConnInit 调度 ConnInit
|
||||||
@ -208,7 +208,7 @@ func (pool *kisPool) CaaS(cname string, fname string, mode common.KisMode, c Caa
|
|||||||
panic(errString)
|
panic(errString)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Logger().InfoF("Add KisPool CaaS", "CName", cname, "FName", fname, "Mode", mode)
|
log.Logger().Info("Add KisPool CaaS", "CName", cname, "FName", fname, "Mode", mode)
|
||||||
}
|
}
|
||||||
|
|
||||||
// CallConnector 调度 Connector
|
// CallConnector 调度 Connector
|
||||||
@ -223,7 +223,7 @@ func (pool *kisPool) CallConnector(ctx context.Context, flow Flow, conn Connecto
|
|||||||
return callback(ctx, conn, fn, flow, args)
|
return callback(ctx, conn, fn, flow, args)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Logger().ErrorFX(ctx, "Can not find in KisPool, Not Added.", "CName", conn.GetName(), " FName", fnConf.FName, "mode", mode)
|
log.Logger().ErrorX(ctx, "Can not find in KisPool, Not Added.", "CName", conn.GetName(), " FName", fnConf.FName, "mode", mode)
|
||||||
|
|
||||||
return nil, errors.New(fmt.Sprintf("CName:%s FName:%s mode:%s Can not find in KisPool, Not Added.", conn.GetName(), fnConf.FName, mode))
|
return nil, errors.New(fmt.Sprintf("CName:%s FName:%s mode:%s Can not find in KisPool, Not Added.", conn.GetName(), fnConf.FName, mode))
|
||||||
}
|
}
|
||||||
|
@ -78,27 +78,27 @@ func getKisDefaultSLog(opts ...KisLogOptions) *kisDefaultSlog {
|
|||||||
return defaultKisSlog
|
return defaultKisSlog
|
||||||
}
|
}
|
||||||
|
|
||||||
func (k *kisDefaultSlog) InfoFX(ctx context.Context, str string, v ...interface{}) {
|
func (k *kisDefaultSlog) InfoX(ctx context.Context, str string, v ...interface{}) {
|
||||||
slog.InfoContext(ctx, str, v...)
|
slog.InfoContext(ctx, str, v...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (k *kisDefaultSlog) ErrorFX(ctx context.Context, str string, v ...interface{}) {
|
func (k *kisDefaultSlog) ErrorX(ctx context.Context, str string, v ...interface{}) {
|
||||||
slog.ErrorContext(ctx, str, v...)
|
slog.ErrorContext(ctx, str, v...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (k *kisDefaultSlog) DebugFX(ctx context.Context, str string, v ...interface{}) {
|
func (k *kisDefaultSlog) DebugX(ctx context.Context, str string, v ...interface{}) {
|
||||||
slog.DebugContext(ctx, str, v...)
|
slog.DebugContext(ctx, str, v...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (k *kisDefaultSlog) InfoF(str string, v ...interface{}) {
|
func (k *kisDefaultSlog) Info(str string, v ...interface{}) {
|
||||||
slog.Info(str, v...)
|
slog.Info(str, v...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (k *kisDefaultSlog) ErrorF(str string, v ...interface{}) {
|
func (k *kisDefaultSlog) Error(str string, v ...interface{}) {
|
||||||
slog.Error(str, v...)
|
slog.Error(str, v...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (k *kisDefaultSlog) DebugF(str string, v ...interface{}) {
|
func (k *kisDefaultSlog) Debug(str string, v ...interface{}) {
|
||||||
slog.Debug(str, v...)
|
slog.Debug(str, v...)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3,19 +3,19 @@ package log
|
|||||||
import "context"
|
import "context"
|
||||||
|
|
||||||
type KisLogger interface {
|
type KisLogger interface {
|
||||||
// InfoFX 有上下文的Info级别日志接口, format字符串格式
|
// InfoX 有上下文的Info级别日志接口, format字符串格式
|
||||||
InfoFX(ctx context.Context, str string, v ...interface{})
|
InfoX(ctx context.Context, str string, v ...interface{})
|
||||||
// ErrorFX 有上下文的Error级别日志接口, format字符串格式
|
// ErrorX 有上下文的Error级别日志接口, format字符串格式
|
||||||
ErrorFX(ctx context.Context, str string, v ...interface{})
|
ErrorX(ctx context.Context, str string, v ...interface{})
|
||||||
// DebugFX 有上下文的Debug级别日志接口, format字符串格式
|
// DebugX 有上下文的Debug级别日志接口, format字符串格式
|
||||||
DebugFX(ctx context.Context, str string, v ...interface{})
|
DebugX(ctx context.Context, str string, v ...interface{})
|
||||||
|
|
||||||
// InfoF 无上下文的Info级别日志接口, format字符串格式
|
// Info 无上下文的Info级别日志接口, format字符串格式
|
||||||
InfoF(str string, v ...interface{})
|
Info(str string, v ...interface{})
|
||||||
// ErrorF 无上下文的Error级别日志接口, format字符串格式
|
// Error 无上下文的Error级别日志接口, format字符串格式
|
||||||
ErrorF(str string, v ...interface{})
|
Error(str string, v ...interface{})
|
||||||
// DebugF 无上下文的Debug级别日志接口, format字符串格式
|
// Debug 无上下文的Debug级别日志接口, format字符串格式
|
||||||
DebugF(str string, v ...interface{})
|
Debug(str string, v ...interface{})
|
||||||
|
|
||||||
// SetDebugMode 设置Debug模式
|
// SetDebugMode 设置Debug模式
|
||||||
SetDebugMode()
|
SetDebugMode()
|
||||||
|
@ -36,7 +36,7 @@ func RunMetricsService(serverAddr string) error {
|
|||||||
// 启动HttpServer
|
// 启动HttpServer
|
||||||
err := http.ListenAndServe(serverAddr, nil) // 多个进程不可监听同一个端口
|
err := http.ListenAndServe(serverAddr, nil) // 多个进程不可监听同一个端口
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Logger().ErrorF("RunMetricsService", "err", err)
|
log.Logger().Error("RunMetricsService", "err", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return err
|
return err
|
||||||
|
@ -19,12 +19,12 @@ func FuncDemo2Handler(ctx context.Context, flow kis.Flow) error {
|
|||||||
|
|
||||||
conn, err := flow.GetConnector()
|
conn, err := flow.GetConnector()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Logger().ErrorFX(ctx, "FuncDemo2Handler(): GetConnector", "err", err.Error())
|
log.Logger().ErrorX(ctx, "FuncDemo2Handler(): GetConnector", "err", err.Error())
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err := conn.Call(ctx, flow, row); err != nil {
|
if _, err := conn.Call(ctx, flow, row); err != nil {
|
||||||
log.Logger().ErrorFX(ctx, "FuncDemo2Handler(): Call", "err", err.Error())
|
log.Logger().ErrorX(ctx, "FuncDemo2Handler(): Call", "err", err.Error())
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -26,7 +26,7 @@ func TestNewFuncConfig(t *testing.T) {
|
|||||||
|
|
||||||
myFunc1 := config.NewFuncConfig("funcName1", common.S, &source, &option)
|
myFunc1 := config.NewFuncConfig("funcName1", common.S, &source, &option)
|
||||||
|
|
||||||
log.Logger().InfoF("funcName1", myFunc1)
|
log.Logger().Info("funcName1", myFunc1)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestNewFlowConfig(t *testing.T) {
|
func TestNewFlowConfig(t *testing.T) {
|
||||||
@ -50,7 +50,7 @@ func TestNewFlowConfig(t *testing.T) {
|
|||||||
myFlow1.AppendFunctionConfig(flowFuncParams1)
|
myFlow1.AppendFunctionConfig(flowFuncParams1)
|
||||||
myFlow1.AppendFunctionConfig(flowFuncParams2)
|
myFlow1.AppendFunctionConfig(flowFuncParams2)
|
||||||
|
|
||||||
log.Logger().InfoF("myFlow1", myFlow1)
|
log.Logger().Info("myFlow1", myFlow1)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestNewConnConfig(t *testing.T) {
|
func TestNewConnConfig(t *testing.T) {
|
||||||
@ -81,8 +81,8 @@ func TestNewConnConfig(t *testing.T) {
|
|||||||
myConnector1 := config.NewConnConfig("connectorName1", "0.0.0.0:9987,0.0.0.0:9997", common.REDIS, "key", connParams)
|
myConnector1 := config.NewConnConfig("connectorName1", "0.0.0.0:9987,0.0.0.0:9997", common.REDIS, "key", connParams)
|
||||||
|
|
||||||
if err := myConnector1.WithFunc(myFunc1); err != nil {
|
if err := myConnector1.WithFunc(myFunc1); err != nil {
|
||||||
log.Logger().ErrorF("WithFunc", "err", err.Error())
|
log.Logger().Error("WithFunc", "err", err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Logger().InfoF("myConnector1", myConnector1)
|
log.Logger().Info("myConnector1", myConnector1)
|
||||||
}
|
}
|
||||||
|
@ -27,7 +27,7 @@ func TestForkFlowCommitBatch(t *testing.T) {
|
|||||||
|
|
||||||
// 3. 提交原始数据
|
// 3. 提交原始数据
|
||||||
if err := flow1.CommitRowBatch(stringRows); err != nil {
|
if err := flow1.CommitRowBatch(stringRows); err != nil {
|
||||||
log.Logger().ErrorF("CommitRowBatch Error", "err", err)
|
log.Logger().Error("CommitRowBatch Error", "err", err)
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -9,11 +9,11 @@ import (
|
|||||||
func TestKisLogger(t *testing.T) {
|
func TestKisLogger(t *testing.T) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
log.Logger().InfoFX(ctx, "TestKisLogger InfoFX")
|
log.Logger().InfoX(ctx, "TestKisLogger InfoX")
|
||||||
log.Logger().ErrorFX(ctx, "TestKisLogger ErrorFX")
|
log.Logger().ErrorX(ctx, "TestKisLogger ErrorX")
|
||||||
log.Logger().DebugFX(ctx, "TestKisLogger DebugFX")
|
log.Logger().DebugX(ctx, "TestKisLogger DebugX")
|
||||||
|
|
||||||
log.Logger().InfoF("TestKisLogger InfoF")
|
log.Logger().Info("TestKisLogger Info")
|
||||||
log.Logger().ErrorF("TestKisLogger ErrorF")
|
log.Logger().Error("TestKisLogger Error")
|
||||||
log.Logger().DebugF("TestKisLogger DebugF")
|
log.Logger().Debug("TestKisLogger Debug")
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user