## Pipe-filter pattern

* 非常適合與數據處理及數據分析系統
* Filter 封裝數據處理的功能
* 松耦合: Filter只跟數據(格式)耦合
* Pipe用于連接Filter傳遞數據或者在異步處理過程中緩沖數據流進程內同步調用時,pipe演變為數據在方法調用間傳遞
示例,pipe處理流程:
SplitFilter -> StringToIntFilter -> SumFilter
filter.go
~~~
package pipefilter
import "errors"
type Request interface{}
type Response interface{}
type Filter interface {
Process(data Request) (Response, error)
}
var DataTypeErr = errors.New("type error of data")
var StringToIntErr = errors.New("string to int error")
~~~
split_filter.go
~~~
package pipefilter
import (
"fmt"
"strings"
)
type SplitFilter struct {
separator string
}
func NewSplitFilter(separator string) *SplitFilter {
return &SplitFilter{separator}
}
func (filter *SplitFilter) Process(data Request) (Response, error) {
fmt.Println("SplitFilter data:", data)
result, ok := data.(string)
if !ok {
return nil, DataTypeErr
}
parts := strings.Split(result, filter.separator)
return parts, nil
}
~~~
string_to_int_filter.go
~~~
package pipefilter
import (
"fmt"
"strconv"
)
type StringToIntFilter struct {
}
func NewStringToIntFilter() *StringToIntFilter {
return new(StringToIntFilter)
}
func (filter *StringToIntFilter) Process(data Request) (Response, error) {
fmt.Println("StringToIntFilter data:", data)
parts, ok := data.([]string)
if !ok {
return nil, DataTypeErr
}
result := []int{}
for _, str := range parts {
num, err := strconv.Atoi(str)
if err != nil {
return nil, StringToIntErr
}
result = append(result, num)
}
return result, nil
}
~~~
sum_filter.go
~~~
package pipefilter
import "fmt"
type SumFilter struct {
}
func NewSumFilter() *SumFilter {
return new(SumFilter)
}
func (filter *SumFilter) Process(data Request) (Response, error) {
fmt.Println("SumFilter data:", data)
arr, ok := data.([]int)
if !ok {
return nil, DataTypeErr
}
summary := 0
for _, num := range arr {
summary += num
}
return summary, nil
}
~~~
pipeline.go
~~~
package pipefilter
type PipeLine struct {
Name string
Filters *[]Filter
}
func NewPipeLine(name string, filters ...Filter) *PipeLine {
return &PipeLine{
Name: name,
Filters: &filters,
}
}
func (pipe *PipeLine) Process(param Request) (Response, error) {
var err error
var data interface{}
for _, filter := range *pipe.Filters {
data, err = filter.Process(param)
if err != nil {
return data, err
}
param = data
}
return data, err
}
~~~
filter_test.go
~~~
package pipefilter
import "testing"
func TestStringSplitFilter(t *testing.T) {
pipe := NewPipeLine("pipeline", NewSplitFilter(","), NewStringToIntFilter(), NewSumFilter())
data := "1,3,5,7,9"
result, err := pipe.Process(data)
if err != nil {
t.Error(err)
} else {
t.Log("result=>", result)
}
}
~~~
- 概述
- go語言基礎特性
- Go語言聲明
- Go項目構建及編譯
- go command
- 程序設計原則
- Go基礎
- 變量
- 常量
- iota
- 基本類型
- byte和rune類型
- 類型定義和類型別名
- 數組
- string
- 高效字符串連接
- string底層原理
- 運算符
- new
- make
- 指針
- 下劃線 & import
- 語法糖
- 簡短變量申明
- 流程控制
- ifelse
- switch
- select
- select實現原理
- select常見案例
- for
- range
- range實現原理
- 常見案例
- range陷阱
- Goto&Break&Continue
- Go函數
- 函數
- 可變參數函數
- 高階函數
- init函數和main函數
- 匿名函數
- 閉包
- 常用內置函數
- defer
- defer常見案例
- defer規則
- defer與函數返回值
- defer實現原理
- defer陷阱
- 數據結構
- slice
- slice內存布局
- slice&array
- slice底層實現
- slice陷阱
- map
- Map實現原理
- 集合
- List
- Set
- 線程安全數據結構
- sync.Map
- Concurrent Map
- 面向對象編程
- struct
- 匿名結構體&匿名字段
- 嵌套結構體
- 結構體的“繼承”
- struct tag
- 行為方法
- 方法與函數
- type Method Value & Method Expressions
- interface
- 類型斷言
- 多態
- 錯誤機制
- error
- 自定義錯誤
- panic&recover
- reflect
- reflect包
- 應用示例
- DeepEqual
- 反射-fillObjectField
- 反射-copyObject
- IO
- 讀取文件
- 寫文件
- bufio
- ioutil
- Go網絡編程
- tcp
- tcp粘包
- udp
- HTTP
- http服務
- httprouter
- webSocket
- go并發編程
- Goroutine
- thread vs goroutine
- Goroutine任務取消
- 通過channel廣播實現
- Context
- Goroutine調度機制
- goroutine調度器1.0
- GMP模型調度器
- 調度器竊取策略
- 調度器的生命周期
- 調度過程全解析
- channel
- 無緩沖的通道
- 緩沖信道
- 單向信道
- chan實現原理
- 共享內存并發機制
- mutex互斥鎖
- mutex
- mutex原理
- mutex模式
- RWLock
- 使用信道處理競態條件
- WaitGroup
- 工作池
- 并發任務
- once運行一次
- 僅需任意任務完成
- 所有任務完成
- 對象池
- 定時器Timer
- Timer
- Timer實現原理
- 周期性定時器Ticker
- Ticker對外接口
- ticker使用場景
- ticker實現原理
- ticker使用陷阱
- 包和依賴管理
- package
- 依賴管理
- 測試
- 單元測試
- 表格測試法
- Banchmark
- BDD
- 常用架構模式
- Pipe-filter pattern
- Micro Kernel
- JSON
- json-內置解析器
- easyjson
- 性能分析
- gc
- 工具類
- fmt
- Time
- builtin
- unsafe
- sync.pool
- atomic
- flag
- runtime
- strconv
- template