Skip to content

Commit

Permalink
refactor the project directory structure
Browse files Browse the repository at this point in the history
refactor netlist & domain data loader
  • Loading branch information
IrineSistiana committed Feb 8, 2021
1 parent 9b117ef commit 473da64
Show file tree
Hide file tree
Showing 72 changed files with 1,606 additions and 1,367 deletions.
4 changes: 2 additions & 2 deletions dispatcher/coremain/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import (
"fmt"
"github.com/IrineSistiana/mosdns/dispatcher/handler"
"github.com/IrineSistiana/mosdns/dispatcher/mlog"
"github.com/IrineSistiana/mosdns/dispatcher/pkg/concurrent_limiter"
_ "github.com/IrineSistiana/mosdns/dispatcher/plugin"
"github.com/IrineSistiana/mosdns/dispatcher/utils"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"os"
Expand Down Expand Up @@ -93,7 +93,7 @@ func loadConfig(f string, depth int) {
if n < 1 {
n = 1
}
pool := utils.NewConcurrentLimiter(n)
pool := concurrent_limiter.NewConcurrentLimiter(n)
wg := new(sync.WaitGroup)
for i, pluginConfig := range c.Plugin {
if len(pluginConfig.Tag) == 0 || len(pluginConfig.Type) == 0 {
Expand Down
69 changes: 69 additions & 0 deletions dispatcher/pkg/concurrent_limiter/client_limiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright (C) 2020-2021, IrineSistiana
//
// This file is part of mosdns.
//
// mosdns is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// mosdns is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

package concurrent_limiter

import (
"github.com/IrineSistiana/mosdns/dispatcher/pkg/concurrent_map"
)

type ClientQueryLimiter struct {
maxQueries int
m *concurrent_map.ConcurrentMap
}

func NewClientQueryLimiter(maxQueries int) *ClientQueryLimiter {
return &ClientQueryLimiter{
maxQueries: maxQueries,
m: concurrent_map.NewConcurrentMap(64),
}
}

func (l *ClientQueryLimiter) Acquire(key string) bool {
return l.m.TestAndSet(key, l.acquireTestAndSet)
}

func (l *ClientQueryLimiter) acquireTestAndSet(v interface{}, ok bool) (newV interface{}, wantUpdate, passed bool) {
n := 0
if ok {
n = v.(int)
}
if n >= l.maxQueries {
return nil, false, false
}
n++
return n, true, true
}

func (l *ClientQueryLimiter) doneTestAndSet(v interface{}, ok bool) (newV interface{}, wantUpdate, passed bool) {
if !ok {
panic("ClientQueryLimiter doneTestAndSet: value is not exist")
}
n := v.(int)
n--
if n < 0 {
panic("ClientQueryLimiter doneTestAndSet: value becomes negative")
}
if n == 0 {
return nil, true, true
}
return n, true, true
}

func (l *ClientQueryLimiter) Done(key string) {
l.m.TestAndSet(key, l.doneTestAndSet)
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package utils
package concurrent_limiter

import (
"strconv"
Expand Down
55 changes: 55 additions & 0 deletions dispatcher/pkg/concurrent_limiter/concurrent_limiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright (C) 2020-2021, IrineSistiana
//
// This file is part of mosdns.
//
// mosdns is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// mosdns is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

package concurrent_limiter

import "fmt"

// ConcurrentLimiter is a soft limiter.
type ConcurrentLimiter struct {
bucket chan struct{}
}

// NewConcurrentLimiter returns a ConcurrentLimiter, max must > 0.
func NewConcurrentLimiter(max int) *ConcurrentLimiter {
if max <= 0 {
panic(fmt.Sprintf("ConcurrentLimiter: invalid max arg: %d", max))
}

bucket := make(chan struct{}, max)
for i := 0; i < max; i++ {
bucket <- struct{}{}
}

return &ConcurrentLimiter{bucket: bucket}
}

func (l *ConcurrentLimiter) Wait() <-chan struct{} {
return l.bucket
}

func (l *ConcurrentLimiter) Done() {
select {
case l.bucket <- struct{}{}:
default:
panic("ConcurrentLimiter: bucket overflow")
}
}

func (l *ConcurrentLimiter) Available() int {
return len(l.bucket)
}
34 changes: 34 additions & 0 deletions dispatcher/pkg/concurrent_limiter/concurrent_limiter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package concurrent_limiter

import (
"context"
"sync"
"testing"
"time"
)

func Test_ConcurrentLimiter_acquire_release(t *testing.T) {
l := NewConcurrentLimiter(500)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
defer cancel()

wg := new(sync.WaitGroup)
wg.Add(1000)
for i := 0; i < 1000; i++ {
go func() {
defer wg.Done()
select {
case <-l.Wait():
time.Sleep(time.Millisecond * 200)
l.Done()
case <-ctx.Done():
t.Fail()
}
}()
}

wg.Wait()
if l.Available() != 500 {
t.Fatal("token leaked")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

package utils
package concurrent_lru

import (
lru2 "github.com/IrineSistiana/mosdns/dispatcher/pkg/lru"
"hash/maphash"
"sync"
)
Expand All @@ -40,7 +41,7 @@ func NewConcurrentLRU(
for i := range cl.l {
cl.l[i] = &shardedLRU{
onGet: onGet,
lru: NewLRU(maxSizePerShard, onEvict),
lru: lru2.NewLRU(maxSizePerShard, onEvict),
}
}

Expand Down Expand Up @@ -95,7 +96,7 @@ type shardedLRU struct {
onGet func(key string, v interface{}) interface{}

sync.Mutex
lru *LRU
lru *lru2.LRU
}

func (sl *shardedLRU) Add(key string, v interface{}) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package utils
package concurrent_lru

import (
"reflect"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

package utils
package concurrent_map

import (
"hash/maphash"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package utils
package concurrent_map

import (
"strconv"
Expand Down
29 changes: 15 additions & 14 deletions dispatcher/utils/net_io.go → dispatcher/pkg/dnsutils/net_io.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

package utils
package dnsutils

import (
"encoding/binary"
"fmt"
"github.com/IrineSistiana/mosdns/dispatcher/pkg/pool"
"github.com/miekg/dns"
"io"
"net"
Expand Down Expand Up @@ -60,8 +61,8 @@ func (e *IOErr) Unwrap() error {
// An io err will be wrapped into an IOErr.
// IsIOErr(err) can check and unwrap the inner io err.
func ReadUDPMsgFrom(c net.PacketConn, bufSize int) (m *dns.Msg, from net.Addr, n int, err error) {
buf := GetMsgBuf(bufSize)
defer ReleaseMsgBuf(buf)
buf := pool.GetMsgBuf(bufSize)
defer pool.ReleaseMsgBuf(buf)

n, from, err = c.ReadFrom(buf)
if err != nil {
Expand All @@ -84,8 +85,8 @@ func ReadUDPMsgFrom(c net.PacketConn, bufSize int) (m *dns.Msg, from net.Addr, n

// ReadMsgFromUDP See ReadUDPMsgFrom.
func ReadMsgFromUDP(c io.Reader, bufSize int) (m *dns.Msg, n int, err error) {
buf := GetMsgBuf(bufSize)
defer ReleaseMsgBuf(buf)
buf := pool.GetMsgBuf(bufSize)
defer pool.ReleaseMsgBuf(buf)

n, err = c.Read(buf)
if err != nil {
Expand All @@ -108,11 +109,11 @@ func ReadMsgFromUDP(c io.Reader, bufSize int) (m *dns.Msg, n int, err error) {
// An io err will be wrapped into an IOErr.
// IsIOErr(err) can check and unwrap the inner io err.
func WriteMsgToUDP(c io.Writer, m *dns.Msg) (n int, err error) {
mRaw, buf, err := PackBuffer(m)
mRaw, buf, err := pool.PackBuffer(m)
if err != nil {
return 0, err
}
defer ReleaseMsgBuf(buf)
defer pool.ReleaseMsgBuf(buf)

return WriteRawMsgToUDP(c, mRaw)
}
Expand All @@ -128,11 +129,11 @@ func WriteRawMsgToUDP(c io.Writer, b []byte) (n int, err error) {

// WriteUDPMsgTo See WriteMsgToUDP.
func WriteUDPMsgTo(m *dns.Msg, c net.PacketConn, to net.Addr) (n int, err error) {
mRaw, buf, err := PackBuffer(m)
mRaw, buf, err := pool.PackBuffer(m)
if err != nil {
return 0, err
}
defer ReleaseMsgBuf(buf)
defer pool.ReleaseMsgBuf(buf)

n, err = c.WriteTo(mRaw, to)
if err != nil {
Expand Down Expand Up @@ -161,8 +162,8 @@ func ReadMsgFromTCP(c io.Reader) (m *dns.Msg, n int, err error) {
return nil, n, dns.ErrShortRead
}

buf := GetMsgBuf(int(length))
defer ReleaseMsgBuf(buf)
buf := pool.GetMsgBuf(int(length))
defer pool.ReleaseMsgBuf(buf)

n2, err := io.ReadFull(c, buf)
n = n + n2
Expand All @@ -185,11 +186,11 @@ func ReadMsgFromTCP(c io.Reader) (m *dns.Msg, n int, err error) {
// An io err will be wrapped into an IOErr.
// IsIOErr(err) can check and unwrap the inner io err.
func WriteMsgToTCP(c io.Writer, m *dns.Msg) (n int, err error) {
mRaw, buf, err := PackBuffer(m)
mRaw, buf, err := pool.PackBuffer(m)
if err != nil {
return 0, err
}
defer ReleaseMsgBuf(buf)
defer pool.ReleaseMsgBuf(buf)

return WriteRawMsgToTCP(c, mRaw)
}
Expand Down Expand Up @@ -222,5 +223,5 @@ func WriteRawMsgToTCP(c io.Writer, b []byte) (n int, err error) {
}

var (
tcpWriteBufPool = NewBytesBufPool(512 + 2)
tcpWriteBufPool = pool.NewBytesBufPool(512 + 2)
)
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
package utils

package dnsutils

import (
"bytes"
Expand Down
Loading

0 comments on commit 473da64

Please sign in to comment.