From 254b2d983a01f5d68dd79c19098a4e160a8b309f Mon Sep 17 00:00:00 2001 From: Kong Jun Date: Sat, 5 Oct 2024 21:54:54 +0800 Subject: [PATCH] feat: update ss2022 inbound users dynamically --- box.go | 61 +++++++- inbound/shadowsocks_multi.go | 45 +++++- option/config.go | 5 + proto/cybertom.proto | 17 +++ proto/cybertom/cybertom.pb.go | 220 +++++++++++++++++++++++++++++ proto/cybertom/cybertom_grpc.pb.go | 121 ++++++++++++++++ 6 files changed, 455 insertions(+), 14 deletions(-) create mode 100644 proto/cybertom.proto create mode 100644 proto/cybertom/cybertom.pb.go create mode 100644 proto/cybertom/cybertom_grpc.pb.go diff --git a/box.go b/box.go index 716b1b09..cba4b144 100644 --- a/box.go +++ b/box.go @@ -4,26 +4,33 @@ import ( "context" "fmt" "io" + "net" "os" "runtime/debug" "time" + "github.com/sagernet/sing/common" + E "github.com/sagernet/sing/common/exceptions" + F "github.com/sagernet/sing/common/format" + "github.com/sagernet/sing/service" + "github.com/sagernet/sing/service/pause" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + + "github.com/sagernet/sing-box/inbound" + pb "github.com/sagernet/sing-box/proto/cybertom" + "github.com/sagernet/sing-box/adapter" "github.com/sagernet/sing-box/common/taskmonitor" C "github.com/sagernet/sing-box/constant" "github.com/sagernet/sing-box/experimental" "github.com/sagernet/sing-box/experimental/cachefile" "github.com/sagernet/sing-box/experimental/libbox/platform" - "github.com/sagernet/sing-box/inbound" + pkginbound "github.com/sagernet/sing-box/inbound" "github.com/sagernet/sing-box/log" "github.com/sagernet/sing-box/option" "github.com/sagernet/sing-box/outbound" "github.com/sagernet/sing-box/route" - "github.com/sagernet/sing/common" - E "github.com/sagernet/sing/common/exceptions" - F "github.com/sagernet/sing/common/format" - "github.com/sagernet/sing/service" - "github.com/sagernet/sing/service/pause" ) var _ adapter.Service = (*Box)(nil) @@ -39,6 +46,7 @@ type Box struct { preServices2 map[string]adapter.Service postServices map[string]adapter.Service done chan struct{} + options *Options } type Options struct { @@ -194,6 +202,7 @@ func New(options Options) (*Box, error) { preServices2: preServices2, postServices: postServices, done: make(chan struct{}), + options: options, }, nil } @@ -271,7 +280,27 @@ func (s *Box) preStart() error { if err != nil { return err } - return s.router.Start() + + err = s.router.Start() + if err != nil { + return err + } + return s.startGRPCServer() +} + +func (s *Box) startGRPCServer() error { + lis, err := net.Listen("tcp", fmt.Sprintf(":%d", s.options.TomConfig.ListenPort)) + if err != nil { + return fmt.Errorf("CyberTom Start: failed to listen: %v", err) + } + srv := grpc.NewServer() + pb.RegisterCyberTom(srv, s) + go func() { + if err := srv.Serve(lis); err != nil { + panic(fmt.Errorf("Cyber Start: failed to serve: %v", err)) + } + }() + return nil } func (s *Box) start() error { @@ -403,3 +432,21 @@ func (s *Box) Close() error { func (s *Box) Router() adapter.Router { return s.router } + +func (s *Box) UpdateUser(ctx context.Context, req *pb.UpdateUserRequest) (*pb.UpdateUserReply, error) { + for _, inbound := range s.inbounds { + if inbound.Tag() == req.Tag { + switch inbound.(type) { + case *pkginbound.ShadowsocksMulti: + in := inbound.(*pkginbound.ShadowsocksMulti) + err := in.UpdateUserPassword(req.Users, req.Passwords) + if err != nil { + return nil, grpc.Errorf(codes.Internal, err.Error()) + } + return &pb.UpdateUserReply{}, nil + } + break + } + } + return nil, grpc.Errorf(codes.InvalidArgument, "unsupported protocol") +} diff --git a/inbound/shadowsocks_multi.go b/inbound/shadowsocks_multi.go index a291af4a..8ebef6fb 100644 --- a/inbound/shadowsocks_multi.go +++ b/inbound/shadowsocks_multi.go @@ -4,15 +4,10 @@ import ( "context" "net" "os" + "sync" "time" - "github.com/sagernet/sing-box/adapter" - "github.com/sagernet/sing-box/common/mux" - "github.com/sagernet/sing-box/common/uot" - C "github.com/sagernet/sing-box/constant" - "github.com/sagernet/sing-box/log" - "github.com/sagernet/sing-box/option" - "github.com/sagernet/sing-shadowsocks" + shadowsocks "github.com/sagernet/sing-shadowsocks" "github.com/sagernet/sing-shadowsocks/shadowaead" "github.com/sagernet/sing-shadowsocks/shadowaead_2022" "github.com/sagernet/sing/common" @@ -22,6 +17,13 @@ import ( F "github.com/sagernet/sing/common/format" N "github.com/sagernet/sing/common/network" "github.com/sagernet/sing/common/ntp" + + "github.com/sagernet/sing-box/adapter" + "github.com/sagernet/sing-box/common/mux" + "github.com/sagernet/sing-box/common/uot" + C "github.com/sagernet/sing-box/constant" + "github.com/sagernet/sing-box/log" + "github.com/sagernet/sing-box/option" ) var ( @@ -33,6 +35,7 @@ type ShadowsocksMulti struct { myInboundAdapter service shadowsocks.MultiService[int] users []option.ShadowsocksUser + mu sync.RWMutex } func newShadowsocksMulti(ctx context.Context, router adapter.Router, logger log.ContextLogger, tag string, options option.ShadowsocksInboundOptions) (*ShadowsocksMulti, error) { @@ -95,10 +98,14 @@ func newShadowsocksMulti(ctx context.Context, router adapter.Router, logger log. } func (h *ShadowsocksMulti) NewConnection(ctx context.Context, conn net.Conn, metadata adapter.InboundContext) error { + h.mu.RLock() + defer h.mu.RUnlock() return h.service.NewConnection(adapter.WithContext(log.ContextWithNewID(ctx), &metadata), conn, adapter.UpstreamMetadata(metadata)) } func (h *ShadowsocksMulti) NewPacket(ctx context.Context, conn N.PacketConn, buffer *buf.Buffer, metadata adapter.InboundContext) error { + h.mu.RLock() + defer h.mu.RUnlock() return h.service.NewPacket(adapter.WithContext(ctx, &metadata), conn, buffer, adapter.UpstreamMetadata(metadata)) } @@ -126,7 +133,9 @@ func (h *ShadowsocksMulti) newPacketConnection(ctx context.Context, conn N.Packe if !loaded { return os.ErrInvalid } + h.mu.RLock() user := h.users[userIndex].Name + h.mu.RUnlock() if user == "" { user = F.ToString(userIndex) } else { @@ -137,3 +146,25 @@ func (h *ShadowsocksMulti) newPacketConnection(ctx context.Context, conn N.Packe h.logger.InfoContext(ctx, "[", user, "] inbound packet connection to ", metadata.Destination) return h.router.RoutePacketConnection(ctx, conn, metadata) } + +func (h *ShadowsocksMulti) UpdateUserPassword(users, passwords []string) error { + shadowsocksUsers := make([]option.ShadowsocksUser, 0, len(users)) + for i, user := range users { + shadowsocksUsers = append(shadowsocksUsers, option.ShadowsocksUser{Name: user, Password: passwords[i]}) + } + + // mu must be locked before UpdateUsersWithPasswords + h.mu.Lock() + defer h.mu.Unlock() + err := h.service.UpdateUsersWithPasswords(common.MapIndexed(shadowsocksUsers, func(index int, user option.ShadowsocksUser) int { + return index + }), common.Map(shadowsocksUsers, func(user option.ShadowsocksUser) string { + return user.Password + })) + + if err != nil { + return err + } + h.users = shadowsocksUsers + return nil +} diff --git a/option/config.go b/option/config.go index 3f5d7602..f8d468ad 100644 --- a/option/config.go +++ b/option/config.go @@ -6,6 +6,10 @@ import ( "github.com/sagernet/sing/common/json" ) +type TomConfig struct { + ListenPort uint16 `json:"listen_port,omitempty"` +} + type _Options struct { RawMessage json.RawMessage `json:"-"` Schema string `json:"$schema,omitempty"` @@ -16,6 +20,7 @@ type _Options struct { Outbounds []Outbound `json:"outbounds,omitempty"` Route *RouteOptions `json:"route,omitempty"` Experimental *ExperimentalOptions `json:"experimental,omitempty"` + TomConfig *TomConfig `json:tom_config,omitempty` } type Options _Options diff --git a/proto/cybertom.proto b/proto/cybertom.proto new file mode 100644 index 00000000..c7daa15a --- /dev/null +++ b/proto/cybertom.proto @@ -0,0 +1,17 @@ +syntax = "proto3"; +option go_package = "/cybertom"; + +package proto; + +service CyberTom { + rpc UpdateUser (UpdateUserRequest) returns (UpdateUserReply) {} +} + +message UpdateUserRequest { + string tag = 1; + repeated string users = 2; + repeated string passwords = 3; +} + +message UpdateUserReply { +} diff --git a/proto/cybertom/cybertom.pb.go b/proto/cybertom/cybertom.pb.go new file mode 100644 index 00000000..c2b8bd9d --- /dev/null +++ b/proto/cybertom/cybertom.pb.go @@ -0,0 +1,220 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.34.2 +// protoc v5.27.3 +// source: cybertom.proto + +package cybertom + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type UpdateUserRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Tag string `protobuf:"bytes,1,opt,name=tag,proto3" json:"tag,omitempty"` + Users []string `protobuf:"bytes,2,rep,name=users,proto3" json:"users,omitempty"` + Passwords []string `protobuf:"bytes,3,rep,name=passwords,proto3" json:"passwords,omitempty"` +} + +func (x *UpdateUserRequest) Reset() { + *x = UpdateUserRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_cybertom_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *UpdateUserRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*UpdateUserRequest) ProtoMessage() {} + +func (x *UpdateUserRequest) ProtoReflect() protoreflect.Message { + mi := &file_cybertom_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use UpdateUserRequest.ProtoReflect.Descriptor instead. +func (*UpdateUserRequest) Descriptor() ([]byte, []int) { + return file_cybertom_proto_rawDescGZIP(), []int{0} +} + +func (x *UpdateUserRequest) GetTag() string { + if x != nil { + return x.Tag + } + return "" +} + +func (x *UpdateUserRequest) GetUsers() []string { + if x != nil { + return x.Users + } + return nil +} + +func (x *UpdateUserRequest) GetPasswords() []string { + if x != nil { + return x.Passwords + } + return nil +} + +type UpdateUserReply struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *UpdateUserReply) Reset() { + *x = UpdateUserReply{} + if protoimpl.UnsafeEnabled { + mi := &file_cybertom_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *UpdateUserReply) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*UpdateUserReply) ProtoMessage() {} + +func (x *UpdateUserReply) ProtoReflect() protoreflect.Message { + mi := &file_cybertom_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use UpdateUserReply.ProtoReflect.Descriptor instead. +func (*UpdateUserReply) Descriptor() ([]byte, []int) { + return file_cybertom_proto_rawDescGZIP(), []int{1} +} + +var File_cybertom_proto protoreflect.FileDescriptor + +var file_cybertom_proto_rawDesc = []byte{ + 0x0a, 0x0e, 0x63, 0x79, 0x62, 0x65, 0x72, 0x74, 0x6f, 0x6d, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x12, 0x05, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x59, 0x0a, 0x11, 0x55, 0x70, 0x64, 0x61, 0x74, + 0x65, 0x55, 0x73, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, + 0x74, 0x61, 0x67, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x74, 0x61, 0x67, 0x12, 0x14, + 0x0a, 0x05, 0x75, 0x73, 0x65, 0x72, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x09, 0x52, 0x05, 0x75, + 0x73, 0x65, 0x72, 0x73, 0x12, 0x1c, 0x0a, 0x09, 0x70, 0x61, 0x73, 0x73, 0x77, 0x6f, 0x72, 0x64, + 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x09, 0x52, 0x09, 0x70, 0x61, 0x73, 0x73, 0x77, 0x6f, 0x72, + 0x64, 0x73, 0x22, 0x11, 0x0a, 0x0f, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x55, 0x73, 0x65, 0x72, + 0x52, 0x65, 0x70, 0x6c, 0x79, 0x32, 0x4c, 0x0a, 0x08, 0x43, 0x79, 0x62, 0x65, 0x72, 0x54, 0x6f, + 0x6d, 0x12, 0x40, 0x0a, 0x0a, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x55, 0x73, 0x65, 0x72, 0x12, + 0x18, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x55, 0x73, + 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x2e, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x55, 0x73, 0x65, 0x72, 0x52, 0x65, 0x70, 0x6c, + 0x79, 0x22, 0x00, 0x42, 0x0b, 0x5a, 0x09, 0x2f, 0x63, 0x79, 0x62, 0x65, 0x72, 0x74, 0x6f, 0x6d, + 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_cybertom_proto_rawDescOnce sync.Once + file_cybertom_proto_rawDescData = file_cybertom_proto_rawDesc +) + +func file_cybertom_proto_rawDescGZIP() []byte { + file_cybertom_proto_rawDescOnce.Do(func() { + file_cybertom_proto_rawDescData = protoimpl.X.CompressGZIP(file_cybertom_proto_rawDescData) + }) + return file_cybertom_proto_rawDescData +} + +var file_cybertom_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_cybertom_proto_goTypes = []any{ + (*UpdateUserRequest)(nil), // 0: proto.UpdateUserRequest + (*UpdateUserReply)(nil), // 1: proto.UpdateUserReply +} +var file_cybertom_proto_depIdxs = []int32{ + 0, // 0: proto.CyberTom.UpdateUser:input_type -> proto.UpdateUserRequest + 1, // 1: proto.CyberTom.UpdateUser:output_type -> proto.UpdateUserReply + 1, // [1:2] is the sub-list for method output_type + 0, // [0:1] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_cybertom_proto_init() } +func file_cybertom_proto_init() { + if File_cybertom_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_cybertom_proto_msgTypes[0].Exporter = func(v any, i int) any { + switch v := v.(*UpdateUserRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_cybertom_proto_msgTypes[1].Exporter = func(v any, i int) any { + switch v := v.(*UpdateUserReply); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_cybertom_proto_rawDesc, + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_cybertom_proto_goTypes, + DependencyIndexes: file_cybertom_proto_depIdxs, + MessageInfos: file_cybertom_proto_msgTypes, + }.Build() + File_cybertom_proto = out.File + file_cybertom_proto_rawDesc = nil + file_cybertom_proto_goTypes = nil + file_cybertom_proto_depIdxs = nil +} diff --git a/proto/cybertom/cybertom_grpc.pb.go b/proto/cybertom/cybertom_grpc.pb.go new file mode 100644 index 00000000..04f81d9b --- /dev/null +++ b/proto/cybertom/cybertom_grpc.pb.go @@ -0,0 +1,121 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.5.1 +// - protoc v5.27.3 +// source: cybertom.proto + +package cybertom + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.64.0 or later. +const _ = grpc.SupportPackageIsVersion9 + +const ( + CyberTom_UpdateUser_FullMethodName = "/proto.CyberTom/UpdateUser" +) + +// CyberTomClient is the client API for CyberTom service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type CyberTomClient interface { + UpdateUser(ctx context.Context, in *UpdateUserRequest, opts ...grpc.CallOption) (*UpdateUserReply, error) +} + +type cyberTomClient struct { + cc grpc.ClientConnInterface +} + +func NewCyberTomClient(cc grpc.ClientConnInterface) CyberTomClient { + return &cyberTomClient{cc} +} + +func (c *cyberTomClient) UpdateUser(ctx context.Context, in *UpdateUserRequest, opts ...grpc.CallOption) (*UpdateUserReply, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(UpdateUserReply) + err := c.cc.Invoke(ctx, CyberTom_UpdateUser_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + +// CyberTomServer is the server API for CyberTom service. +// All implementations must embed UnimplementedCyberTomServer +// for forward compatibility. +type CyberTomServer interface { + UpdateUser(context.Context, *UpdateUserRequest) (*UpdateUserReply, error) + mustEmbedUnimplementedCyberTomServer() +} + +// UnimplementedCyberTomServer must be embedded to have +// forward compatible implementations. +// +// NOTE: this should be embedded by value instead of pointer to avoid a nil +// pointer dereference when methods are called. +type UnimplementedCyberTomServer struct{} + +func (UnimplementedCyberTomServer) UpdateUser(context.Context, *UpdateUserRequest) (*UpdateUserReply, error) { + return nil, status.Errorf(codes.Unimplemented, "method UpdateUser not implemented") +} +func (UnimplementedCyberTomServer) mustEmbedUnimplementedCyberTomServer() {} +func (UnimplementedCyberTomServer) testEmbeddedByValue() {} + +// UnsafeCyberTomServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to CyberTomServer will +// result in compilation errors. +type UnsafeCyberTomServer interface { + mustEmbedUnimplementedCyberTomServer() +} + +func RegisterCyberTomServer(s grpc.ServiceRegistrar, srv CyberTomServer) { + // If the following call pancis, it indicates UnimplementedCyberTomServer was + // embedded by pointer and is nil. This will cause panics if an + // unimplemented method is ever invoked, so we test this at initialization + // time to prevent it from happening at runtime later due to I/O. + if t, ok := srv.(interface{ testEmbeddedByValue() }); ok { + t.testEmbeddedByValue() + } + s.RegisterService(&CyberTom_ServiceDesc, srv) +} + +func _CyberTom_UpdateUser_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(UpdateUserRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(CyberTomServer).UpdateUser(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: CyberTom_UpdateUser_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(CyberTomServer).UpdateUser(ctx, req.(*UpdateUserRequest)) + } + return interceptor(ctx, in, info, handler) +} + +// CyberTom_ServiceDesc is the grpc.ServiceDesc for CyberTom service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var CyberTom_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "proto.CyberTom", + HandlerType: (*CyberTomServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "UpdateUser", + Handler: _CyberTom_UpdateUser_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "cybertom.proto", +}