chore: refactor httpx.TimeoutHandler (#3400)

This commit is contained in:
Kevin Wan 2023-07-09 15:04:59 +08:00 committed by GitHub
parent 3170afd57b
commit 31b9ba19a2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 91 additions and 37 deletions

View File

@ -127,18 +127,29 @@ type timeoutWriter struct {
var _ http.Pusher = (*timeoutWriter)(nil) var _ http.Pusher = (*timeoutWriter)(nil)
// Flush implements the Flusher interface.
func (tw *timeoutWriter) Flush() { func (tw *timeoutWriter) Flush() {
dst := tw.w.Header() flusher, ok := tw.w.(http.Flusher)
for k, vv := range tw.h { if !ok {
dst[k] = vv return
} }
if flusher, ok := tw.w.(http.Flusher); ok {
tw.w.Write(tw.wbuf.Bytes()) header := tw.w.Header()
tw.wbuf.Reset() for k, v := range tw.h {
flusher.Flush() header[k] = v
} }
tw.w.Write(tw.wbuf.Bytes())
tw.wbuf.Reset()
flusher.Flush()
} }
// Header returns the underline temporary http.Header.
func (tw *timeoutWriter) Header() http.Header {
return tw.h
}
// Hijack implements the Hijacker interface.
func (tw *timeoutWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) { func (tw *timeoutWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) {
if hijacked, ok := tw.w.(http.Hijacker); ok { if hijacked, ok := tw.w.(http.Hijacker); ok {
return hijacked.Hijack() return hijacked.Hijack()
@ -147,14 +158,12 @@ func (tw *timeoutWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) {
return nil, nil, errors.New("server doesn't support hijacking") return nil, nil, errors.New("server doesn't support hijacking")
} }
// Header returns the underline temporary http.Header.
func (tw *timeoutWriter) Header() http.Header { return tw.h }
// Push implements the Pusher interface. // Push implements the Pusher interface.
func (tw *timeoutWriter) Push(target string, opts *http.PushOptions) error { func (tw *timeoutWriter) Push(target string, opts *http.PushOptions) error {
if pusher, ok := tw.w.(http.Pusher); ok { if pusher, ok := tw.w.(http.Pusher); ok {
return pusher.Push(target, opts) return pusher.Push(target, opts)
} }
return http.ErrNotSupported return http.ErrNotSupported
} }
@ -171,6 +180,7 @@ func (tw *timeoutWriter) Write(p []byte) (int, error) {
if !tw.wroteHeader { if !tw.wroteHeader {
tw.writeHeaderLocked(http.StatusOK) tw.writeHeaderLocked(http.StatusOK)
} }
return tw.wbuf.Write(p) return tw.wbuf.Write(p)
} }

View File

@ -17,35 +17,63 @@ import (
) )
func TestTimeoutWriteFlushOutput(t *testing.T) { func TestTimeoutWriteFlushOutput(t *testing.T) {
timeoutHandler := TimeoutHandler(1000 * time.Millisecond) t.Run("flusher", func(t *testing.T) {
handler := timeoutHandler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { timeoutHandler := TimeoutHandler(1000 * time.Millisecond)
w.Header().Set("Content-Type", "text/event-stream;charset=utf-8") handler := timeoutHandler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
flusher, ok := w.(http.Flusher) w.Header().Set("Content-Type", "text/event-stream; charset=utf-8")
if !ok { flusher, ok := w.(http.Flusher)
http.Error(w, "Flushing not supported", http.StatusInternalServerError) if !ok {
return http.Error(w, "Flushing not supported", http.StatusInternalServerError)
return
}
for i := 1; i <= 5; i++ {
fmt.Fprint(w, strconv.Itoa(i)+" cats\n\n")
flusher.Flush()
time.Sleep(time.Millisecond)
}
}))
req := httptest.NewRequest(http.MethodGet, "http://localhost", http.NoBody)
resp := httptest.NewRecorder()
handler.ServeHTTP(resp, req)
scanner := bufio.NewScanner(resp.Body)
var cats int
for scanner.Scan() {
line := scanner.Text()
if strings.Contains(line, "cats") {
cats++
}
} }
for i := 1; i <= 5; i++ { if err := scanner.Err(); err != nil {
fmt.Fprint(w, strconv.Itoa(i)+"只猫猫\n\n") cats = 0
flusher.Flush()
time.Sleep(time.Millisecond)
} }
})) assert.Equal(t, 5, cats)
req := httptest.NewRequest(http.MethodGet, "http://localhost", http.NoBody) })
resp := httptest.NewRecorder()
handler.ServeHTTP(resp, req) t.Run("writer", func(t *testing.T) {
scanner := bufio.NewScanner(resp.Body) recorder := httptest.NewRecorder()
mao := 0 timeoutHandler := TimeoutHandler(1000 * time.Millisecond)
for scanner.Scan() { handler := timeoutHandler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
line := scanner.Text() w.Header().Set("Content-Type", "text/event-stream; charset=utf-8")
if strings.Contains(line, "猫猫") { flusher, ok := w.(http.Flusher)
mao++ if !ok {
} http.Error(w, "Flushing not supported", http.StatusInternalServerError)
} return
if err := scanner.Err(); err != nil { }
mao = 0
} for i := 1; i <= 5; i++ {
assert.Equal(t, "5只猫猫", strconv.Itoa(mao)+"只猫猫") fmt.Fprint(w, strconv.Itoa(i)+" cats\n\n")
flusher.Flush()
time.Sleep(time.Millisecond)
assert.Empty(t, recorder.Body.String())
}
}))
req := httptest.NewRequest(http.MethodGet, "http://localhost", http.NoBody)
resp := mockedResponseWriter{recorder}
handler.ServeHTTP(resp, req)
assert.Equal(t, "1 cats\n\n2 cats\n\n3 cats\n\n4 cats\n\n5 cats\n\n",
recorder.Body.String())
})
} }
func TestTimeout(t *testing.T) { func TestTimeout(t *testing.T) {
@ -274,3 +302,19 @@ func (m mockedPusher) WriteHeader(_ int) {
func (m mockedPusher) Push(_ string, _ *http.PushOptions) error { func (m mockedPusher) Push(_ string, _ *http.PushOptions) error {
panic("implement me") panic("implement me")
} }
type mockedResponseWriter struct {
http.ResponseWriter
}
func (m mockedResponseWriter) Header() http.Header {
return m.ResponseWriter.Header()
}
func (m mockedResponseWriter) Write(bytes []byte) (int, error) {
return m.ResponseWriter.Write(bytes)
}
func (m mockedResponseWriter) WriteHeader(statusCode int) {
m.ResponseWriter.WriteHeader(statusCode)
}