diff --git a/experimental/libbox/command_client.go b/experimental/libbox/command_client.go index 199dce0d..bd995115 100644 --- a/experimental/libbox/command_client.go +++ b/experimental/libbox/command_client.go @@ -25,8 +25,8 @@ type CommandClientOptions struct { type CommandClientHandler interface { Connected() Disconnected(message string) - ClearLog() - WriteLog(message string) + ClearLogs() + WriteLogs(messageList StringIterator) WriteStatus(message *StatusMessage) WriteGroups(message OutboundGroupIterator) InitializeClashMode(modeList StringIterator, currentMode string) @@ -84,6 +84,10 @@ func (c *CommandClient) Connect() error { } switch c.options.Command { case CommandLog: + err = binary.Write(conn, binary.BigEndian, c.options.StatusInterval) + if err != nil { + return E.Cause(err, "write interval") + } c.handler.Connected() go c.handleLogConn(conn) case CommandStatus: diff --git a/experimental/libbox/command_log.go b/experimental/libbox/command_log.go index ce72010d..8a22aa2e 100644 --- a/experimental/libbox/command_log.go +++ b/experimental/libbox/command_log.go @@ -1,10 +1,14 @@ package libbox import ( + "bufio" "context" - "encoding/binary" "io" "net" + "time" + + "github.com/sagernet/sing/common/binary" + E "github.com/sagernet/sing/common/exceptions" ) func (s *CommandServer) WriteMessage(message string) { @@ -17,43 +21,39 @@ func (s *CommandServer) WriteMessage(message string) { s.access.Unlock() } -func readLog(reader io.Reader) ([]byte, error) { - var messageLength uint16 - err := binary.Read(reader, binary.BigEndian, &messageLength) - if err != nil { - return nil, err - } - if messageLength == 0 { - return nil, nil - } - data := make([]byte, messageLength) - _, err = io.ReadFull(reader, data) - if err != nil { - return nil, err - } - return data, nil -} - -func writeLog(writer io.Writer, message []byte) error { +func writeLog(writer *bufio.Writer, messages []string) error { err := binary.Write(writer, binary.BigEndian, uint8(0)) if err != nil { return err } - err = binary.Write(writer, binary.BigEndian, uint16(len(message))) + err = binary.WriteData(writer, binary.BigEndian, messages) if err != nil { return err } - if len(message) > 0 { - _, err = writer.Write(message) - } - return err + return writer.Flush() } -func writeClearLog(writer io.Writer) error { - return binary.Write(writer, binary.BigEndian, uint8(1)) +func writeClearLog(writer *bufio.Writer) error { + err := binary.Write(writer, binary.BigEndian, uint8(1)) + if err != nil { + return err + } + return writer.Flush() } func (s *CommandServer) handleLogConn(conn net.Conn) error { + var ( + interval int64 + timer *time.Timer + ) + err := binary.Read(conn, binary.BigEndian, &interval) + if err != nil { + return E.Cause(err, "read interval") + } + timer = time.NewTimer(time.Duration(interval)) + if !timer.Stop() { + <-timer.C + } var savedLines []string s.access.Lock() savedLines = make([]string, 0, s.savedLines.Len()) @@ -66,52 +66,67 @@ func (s *CommandServer) handleLogConn(conn net.Conn) error { return err } defer s.observer.UnSubscribe(subscription) - for _, line := range savedLines { - err = writeLog(conn, []byte(line)) + writer := bufio.NewWriter(conn) + if len(savedLines) > 0 { + err = writeLog(writer, savedLines) if err != nil { return err } } ctx := connKeepAlive(conn) + var logLines []string for { select { case <-ctx.Done(): return ctx.Err() - case message := <-subscription: - err = writeLog(conn, []byte(message)) - if err != nil { - return err - } case <-s.logReset: - err = writeClearLog(conn) + err = writeClearLog(writer) if err != nil { return err } case <-done: return nil + case logLine := <-subscription: + logLines = logLines[:0] + logLines = append(logLines, logLine) + timer.Reset(time.Duration(interval)) + loopLogs: + for { + select { + case logLine = <-subscription: + logLines = append(logLines, logLine) + case <-timer.C: + break loopLogs + } + } + err = writeLog(writer, logLines) + if err != nil { + return err + } } } } func (c *CommandClient) handleLogConn(conn net.Conn) { + reader := bufio.NewReader(conn) for { var messageType uint8 - err := binary.Read(conn, binary.BigEndian, &messageType) + err := binary.Read(reader, binary.BigEndian, &messageType) if err != nil { c.handler.Disconnected(err.Error()) return } - var message []byte + var messages []string switch messageType { case 0: - message, err = readLog(conn) + err = binary.ReadData(reader, binary.BigEndian, &messages) if err != nil { c.handler.Disconnected(err.Error()) return } - c.handler.WriteLog(string(message)) + c.handler.WriteLogs(newIterator(messages)) case 1: - c.handler.ClearLog() + c.handler.ClearLogs() } } } @@ -120,7 +135,7 @@ func connKeepAlive(reader io.Reader) context.Context { ctx, cancel := context.WithCancelCause(context.Background()) go func() { for { - _, err := readLog(reader) + _, err := reader.Read(make([]byte, 1)) if err != nil { cancel(err) return diff --git a/experimental/libbox/iterator.go b/experimental/libbox/iterator.go index 530a7e43..50d7385b 100644 --- a/experimental/libbox/iterator.go +++ b/experimental/libbox/iterator.go @@ -3,8 +3,9 @@ package libbox import "github.com/sagernet/sing/common" type StringIterator interface { - Next() string + Len() int32 HasNext() bool + Next() string } var _ StringIterator = (*iterator[string])(nil) @@ -21,6 +22,14 @@ func newPtrIterator[T any](values []T) *iterator[*T] { return &iterator[*T]{common.Map(values, func(value T) *T { return &value })} } +func (i *iterator[T]) Len() int32 { + return int32(len(i.values)) +} + +func (i *iterator[T]) HasNext() bool { + return len(i.values) > 0 +} + func (i *iterator[T]) Next() T { if len(i.values) == 0 { return common.DefaultValue[T]() @@ -30,10 +39,6 @@ func (i *iterator[T]) Next() T { return nextValue } -func (i *iterator[T]) HasNext() bool { - return len(i.values) > 0 -} - type abstractIterator[T any] interface { Next() T HasNext() bool