http server 代码如下:
package main
import (
"net/http"
)
func main() {
http.HandleFunc("/eof", func(w http.ResponseWriter, r *http.Request) {
// 获取底层连接,提前关闭,模拟EOF
hj, ok := w.(http.Hijacker)
if !ok {
http.Error(w, "webserver doesn't support hijacking", http.StatusInternalServerError)
return
}
conn, _, err := hj.Hijack()
if err != nil {
return
}
conn.Close() // 直接关闭连接,不返回任何内容
})
// http.HandleFunc("/eof2", func(w http.ResponseWriter, r *http.Request) {
// w.Header().Set("Content-Length", "100")
// // 故意不写 body,客户端会尝试读 100 字节,实际为 0,会遇到 EOF
// })
http.ListenAndServe(":8088", nil)
}上面是模仿服务端返回 EOF 报错的场景。
客户端的代码如下:
package main
import (
"fmt"
"bytes"
"net/http"
"encoding/json"
"time"
// "strconv"
"io"
"errors"
)
type User struct {
Username string `json:"username"`
}
func SendFlashDuty(events []*User, flashDutyChannelID int64, client *http.Client) (string, error) {
// todo 每一个 channel 批量发送事件
if client == nil {
return "", fmt.Errorf("http client not found")
}
body, err := json.Marshal(events)
if err != nil {
return "", err
}
req, err := http.NewRequest("POST", "http://127.0.0.1:8088/eof", bytes.NewBuffer(body))
if err != nil {
fmt.Printf("failed to create request: %v, event: %v\n", err, events)
return "", err
}
// 设置 URL 参数
// query := req.URL.Query()
// if flashDutyChannelID != 0 {
// // 如果 flashduty 有配置协作空间(channel_id),则传入 channel_id 参数
// query.Add("channel_id", strconv.FormatInt(flashDutyChannelID, 10))
// }
// req.URL.RawQuery = query.Encode()
// req.Header.Add("Content-Type", "application/json")
// 获取重试配置,设置默认值
retryTimes := 3
// 重试机制
for i := 0; i <= retryTimes; i++ {
fmt.Printf("111 send flashduty body:%+v\n", string(body))
// 直接使用客户端发送请求,超时时间已经在 client 中设置
resp, err := client.Do(req)
if err != nil {
fmt.Printf("222 send flashduty err:%v times:%d\n", err, i+1)
if i < retryTimes {
time.Sleep(time.Duration(100) * time.Millisecond)
}
continue
}
defer resp.Body.Close()
// 读取响应
body, err := io.ReadAll(resp.Body)
if err != nil {
fmt.Printf("failed to read response: %v, event: %v\n", err, events)
}
fmt.Printf("send flashduty req:%+v resp:%+v body:%+v err:%v times:%d\n", req, resp, string(body), err, i+1)
if resp.StatusCode == http.StatusOK {
return string(body), nil
}
if i < retryTimes {
time.Sleep(time.Duration(100) * time.Millisecond)
}
}
return "", errors.New("failed to send request")
}
func main() {
event := &User{
Username: "hello",
}
client := &http.Client{
Timeout: 10 * time.Second,
}
res, err := SendFlashDuty([]*User{event}, 1, client)
fmt.Println(">>", res, err)
}上面的代码基本可以看做是模仿了夜莺调用 Duty 的逻辑。执行的时候报错如下:
ulric@ulric-flashcat eof-server % go run n9e.go
111 send flashduty body:[{"username":"hello"}]
222 send flashduty err:Post "http://127.0.0.1:8088/eof": EOF times:1
111 send flashduty body:[{"username":"hello"}]
222 send flashduty err:Post "http://127.0.0.1:8088/eof": http: ContentLength=22 with Body length 0 times:2
111 send flashduty body:[{"username":"hello"}]
222 send flashduty err:Post "http://127.0.0.1:8088/eof": http: ContentLength=22 with Body length 0 times:3
111 send flashduty body:[{"username":"hello"}]
222 send flashduty err:Post "http://127.0.0.1:8088/eof": http: ContentLength=22 with Body length 0 times:4
>> failed to send request
改造之后,正确的代码是:
package main
import (
"fmt"
"bytes"
"net/http"
"encoding/json"
"time"
// "strconv"
"io"
"errors"
)
type User struct {
Username string `json:"username"`
}
func getRequest(bs []byte) *http.Request {
req, err := http.NewRequest("POST", "http://127.0.0.1:8088/eof", bytes.NewBuffer(bs))
if err != nil {
fmt.Printf("failed to create request: %v, request body: %v\n", err, string(bs))
panic("panic1")
return nil
}
return req
}
func SendFlashDuty(events []*User, flashDutyChannelID int64, client *http.Client) (string, error) {
// todo 每一个 channel 批量发送事件
if client == nil {
return "", fmt.Errorf("http client not found")
}
body, err := json.Marshal(events)
if err != nil {
return "", err
}
// 获取重试配置,设置默认值
retryTimes := 3
// 重试机制
for i := 0; i <= retryTimes; i++ {
// 直接使用客户端发送请求,超时时间已经在 client 中设置
resp, err := client.Do(getRequest(body))
if err != nil {
fmt.Printf("222 send flashduty err:%v times:%d\n", err, i+1)
if i < retryTimes {
time.Sleep(time.Duration(100) * time.Millisecond)
}
continue
}
defer resp.Body.Close()
// 读取响应
body, err := io.ReadAll(resp.Body)
if err != nil {
fmt.Printf("failed to read response: %v, event: %v\n", err, events)
}
// fmt.Printf("send flashduty req:%+v resp:%+v body:%+v err:%v times:%d\n", req, resp, string(body), err, i+1)
if resp.StatusCode == http.StatusOK {
return string(body), nil
}
if i < retryTimes {
time.Sleep(time.Duration(100) * time.Millisecond)
}
}
return "", errors.New("failed to send request")
}
func main() {
event := &User{
Username: "hello",
}
client := &http.Client{
Timeout: 10 * time.Second,
}
res, err := SendFlashDuty([]*User{event}, 1, client)
fmt.Println(">>", res, err)
}改造之后,执行,错误变成了:
ulric@ulric-flashcat eof-server % go run n9e2.go
222 send flashduty err:Post "http://127.0.0.1:8088/eof": EOF times:1
222 send flashduty err:Post "http://127.0.0.1:8088/eof": EOF times:2
222 send flashduty err:Post "http://127.0.0.1:8088/eof": EOF times:3
222 send flashduty err:Post "http://127.0.0.1:8088/eof": EOF times:4
>> failed to send request
- 结论:从第二次开始的重试,都无法起到重试效果
- 改造:不要复用 io.Reader,而是每次重试都搞一个新的 http request 即可