mirror of
https://github.com/safing/portmaster
synced 2025-09-01 18:19:12 +00:00
Rework tcp resolver to make it cleaner
From PR Review https://github.com/safing/portmaster/pull/78
This commit is contained in:
parent
7c6c4552aa
commit
6298d1df30
5 changed files with 311 additions and 276 deletions
|
@ -261,7 +261,7 @@ func handleRequest(ctx context.Context, w dns.ResponseWriter, query *dns.Msg) er
|
|||
rrCache, err := resolver.Resolve(ctx, q)
|
||||
if err != nil {
|
||||
// TODO: analyze nxdomain requests, malware could be trying DGA-domains
|
||||
tracer.Warningf("nameserver: %s requested %s%s: %s", conn.Process(), q.FQDN, q.QType, err)
|
||||
tracer.Debugf("nameserver: %s requested %s%s: %s", conn.Process(), q.FQDN, q.QType, err)
|
||||
|
||||
if errors.Is(err, resolver.ErrBlocked) {
|
||||
conn.Block(err.Error())
|
||||
|
|
|
@ -1,15 +0,0 @@
|
|||
package resolver
|
||||
|
||||
// DISABLE TESTING FOR NOW: find a way to have tests with the module system
|
||||
|
||||
// import (
|
||||
// "testing"
|
||||
// "time"
|
||||
//
|
||||
// "github.com/miekg/dns"
|
||||
// )
|
||||
|
||||
// func TestResolve(t *testing.T) {
|
||||
// Resolve("google.com.", dns.Type(dns.TypeA), 0)
|
||||
// time.Sleep(200 * time.Millisecond)
|
||||
// }
|
|
@ -50,8 +50,7 @@ func TestClientPooling(t *testing.T) {
|
|||
}
|
||||
|
||||
// create separate resolver for this test
|
||||
resolver, _, err := createResolver("dot://9.9.9.9:853?verify=dns.quad9.net&name=Quad9&blockedif=empty", "config")
|
||||
// resolver, _, err := createResolver("dot://1.1.1.2:853?verify=cloudflare-dns.com&name=Cloudflare&blockedif=zeroip", "config")
|
||||
resolver, _, err := createResolver(testResolver, "config")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
|
|
@ -22,14 +22,13 @@ const (
|
|||
type TCPResolver struct {
|
||||
BasicResolverConn
|
||||
|
||||
clientTTL time.Duration
|
||||
dnsClient *dns.Client
|
||||
dnsConnection *dns.Conn
|
||||
connInstanceID *uint32
|
||||
clientTTL time.Duration
|
||||
dnsClient *dns.Client
|
||||
|
||||
clientStarted *abool.AtomicBool
|
||||
connInstanceID *uint32
|
||||
queries chan *dns.Msg
|
||||
inFlightQueries map[uint16]*InFlightQuery
|
||||
clientStarted *abool.AtomicBool
|
||||
}
|
||||
|
||||
// InFlightQuery represents an in flight query of a TCPResolver.
|
||||
|
@ -86,254 +85,10 @@ func (tr *TCPResolver) UseTLS() *TCPResolver {
|
|||
return tr
|
||||
}
|
||||
|
||||
func (tr *TCPResolver) client(workerCtx context.Context) error { //nolint:gocognit,gocyclo // TODO
|
||||
connTimer := time.NewTimer(tr.clientTTL)
|
||||
connClosing := abool.New()
|
||||
var connCtx context.Context
|
||||
var cancelConnCtx func()
|
||||
var recycleConn bool
|
||||
var shuttingDown bool
|
||||
var failCnt int
|
||||
var incoming = make(chan *dns.Msg, 100)
|
||||
|
||||
connMgmt:
|
||||
for {
|
||||
// cleanup old connection
|
||||
if tr.dnsConnection != nil {
|
||||
connClosing.Set()
|
||||
_ = tr.dnsConnection.Close()
|
||||
if cancelConnCtx != nil {
|
||||
cancelConnCtx()
|
||||
}
|
||||
|
||||
tr.dnsConnection = nil
|
||||
atomic.AddUint32(tr.connInstanceID, 1)
|
||||
}
|
||||
|
||||
// check if we are shutting down or failing
|
||||
if shuttingDown || failCnt >= FailThreshold || tr.IsFailing() {
|
||||
// reply to all waiting queries
|
||||
tr.Lock()
|
||||
for id, inFlight := range tr.inFlightQueries {
|
||||
close(inFlight.Response)
|
||||
delete(tr.inFlightQueries, id)
|
||||
}
|
||||
tr.clientStarted.UnSet() // in lock to guarantee to set before submitQuery proceeds
|
||||
tr.Unlock()
|
||||
|
||||
// hint network environment at failed connection
|
||||
if failCnt >= FailThreshold {
|
||||
netenv.ReportFailedConnection()
|
||||
}
|
||||
|
||||
// The linter said so. Don't even...
|
||||
if cancelConnCtx != nil {
|
||||
cancelConnCtx()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// wait until there is something to do
|
||||
tr.Lock()
|
||||
waiting := len(tr.inFlightQueries)
|
||||
tr.Unlock()
|
||||
if waiting > 0 {
|
||||
// queue abandoned queries
|
||||
ignoreBefore := time.Now().Add(-ignoreQueriesAfter)
|
||||
currentConnInstanceID := atomic.LoadUint32(tr.connInstanceID)
|
||||
tr.Lock()
|
||||
for id, inFlight := range tr.inFlightQueries {
|
||||
if inFlight.Started.Before(ignoreBefore) {
|
||||
// remove
|
||||
delete(tr.inFlightQueries, id)
|
||||
} else if inFlight.ConnInstanceID != currentConnInstanceID {
|
||||
inFlight.ConnInstanceID = currentConnInstanceID
|
||||
// re-inject
|
||||
select {
|
||||
case tr.queries <- inFlight.Msg:
|
||||
default:
|
||||
log.Warningf("resolver: failed to re-inject abandoned query to %s", tr.resolver.Name)
|
||||
}
|
||||
}
|
||||
}
|
||||
tr.Unlock()
|
||||
} else {
|
||||
// wait for first query
|
||||
select {
|
||||
case <-workerCtx.Done():
|
||||
// abort
|
||||
shuttingDown = true
|
||||
continue connMgmt
|
||||
case msg := <-tr.queries:
|
||||
// re-insert, we will handle later
|
||||
select {
|
||||
case tr.queries <- msg:
|
||||
default:
|
||||
log.Warningf("resolver: failed to re-inject waking query to %s", tr.resolver.Name)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// create connection
|
||||
connCtx, cancelConnCtx = context.WithCancel(workerCtx)
|
||||
// refresh dialer for authenticated local address
|
||||
tr.dnsClient.Dialer = &net.Dialer{
|
||||
LocalAddr: getLocalAddr("tcp"),
|
||||
Timeout: defaultConnectTimeout,
|
||||
KeepAlive: defaultClientTTL,
|
||||
}
|
||||
// connect
|
||||
c, err := tr.dnsClient.Dial(tr.resolver.ServerAddress)
|
||||
if err != nil {
|
||||
tr.ReportFailure()
|
||||
failCnt++
|
||||
if tr.IsFailing() {
|
||||
shuttingDown = true
|
||||
}
|
||||
log.Debugf("resolver: failed to connect to %s (%s)", tr.resolver.Name, tr.resolver.ServerAddress)
|
||||
netenv.ReportFailedConnection()
|
||||
continue connMgmt
|
||||
}
|
||||
tr.dnsConnection = c
|
||||
log.Debugf("resolver: connected to %s (%s)", tr.resolver.Name, tr.dnsConnection.RemoteAddr())
|
||||
|
||||
// hint network environment at successful connection
|
||||
netenv.ReportSuccessfulConnection()
|
||||
|
||||
// reset timer
|
||||
connTimer.Stop()
|
||||
select {
|
||||
case <-connTimer.C: // try to empty the timer
|
||||
default:
|
||||
}
|
||||
connTimer.Reset(tr.clientTTL)
|
||||
recycleConn = false
|
||||
|
||||
// start reader
|
||||
module.StartWorker("dns client reader", func(ctx context.Context) error {
|
||||
conn := tr.dnsConnection
|
||||
for {
|
||||
msg, err := conn.ReadMsg()
|
||||
if err != nil {
|
||||
if connClosing.SetToIf(false, true) {
|
||||
if cancelConnCtx != nil {
|
||||
cancelConnCtx()
|
||||
}
|
||||
tr.ReportFailure()
|
||||
failCnt++
|
||||
if tr.IsFailing() {
|
||||
shuttingDown = true
|
||||
}
|
||||
log.Warningf("resolver: read error from %s (%s): %s", tr.resolver.Name, tr.dnsConnection.RemoteAddr(), err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
incoming <- msg
|
||||
}
|
||||
})
|
||||
|
||||
// query management
|
||||
for {
|
||||
select {
|
||||
case <-workerCtx.Done():
|
||||
// shutting down
|
||||
shuttingDown = true
|
||||
continue connMgmt
|
||||
case <-connCtx.Done():
|
||||
// connection error
|
||||
continue connMgmt
|
||||
case <-connTimer.C:
|
||||
// client TTL expired, recycle connection
|
||||
recycleConn = true
|
||||
// trigger check
|
||||
select {
|
||||
case incoming <- nil:
|
||||
default:
|
||||
// quere is full anyway, do nothing
|
||||
}
|
||||
|
||||
case msg := <-tr.queries:
|
||||
// write query
|
||||
_ = tr.dnsConnection.SetWriteDeadline(time.Now().Add(tr.dnsClient.WriteTimeout))
|
||||
err := tr.dnsConnection.WriteMsg(msg)
|
||||
if err != nil {
|
||||
if connClosing.SetToIf(false, true) {
|
||||
if cancelConnCtx != nil {
|
||||
cancelConnCtx()
|
||||
}
|
||||
tr.ReportFailure()
|
||||
failCnt++
|
||||
if tr.IsFailing() {
|
||||
shuttingDown = true
|
||||
}
|
||||
log.Warningf("resolver: write error to %s (%s): %s", tr.resolver.Name, tr.dnsConnection.RemoteAddr(), err)
|
||||
}
|
||||
continue connMgmt
|
||||
}
|
||||
|
||||
case msg := <-incoming:
|
||||
|
||||
if msg != nil {
|
||||
// handle query from resolver
|
||||
tr.Lock()
|
||||
inFlight, ok := tr.inFlightQueries[msg.Id]
|
||||
if ok {
|
||||
delete(tr.inFlightQueries, msg.Id)
|
||||
}
|
||||
tr.Unlock()
|
||||
|
||||
if ok {
|
||||
select {
|
||||
case inFlight.Response <- msg:
|
||||
failCnt = 0 // reset fail counter
|
||||
// responded!
|
||||
default:
|
||||
// save to cache, if enabled
|
||||
if !inFlight.Query.NoCaching {
|
||||
// persist to database
|
||||
rrCache := inFlight.MakeCacheRecord(msg)
|
||||
rrCache.Clean(600)
|
||||
err = rrCache.Save()
|
||||
if err != nil {
|
||||
log.Warningf(
|
||||
"resolver: failed to cache RR for %s%s: %s",
|
||||
inFlight.Query.FQDN,
|
||||
inFlight.Query.QType.String(),
|
||||
err,
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
log.Debugf(
|
||||
"resolver: received possibly unsolicited reply from %s (%s): txid=%d q=%+v",
|
||||
tr.resolver.Name,
|
||||
tr.dnsConnection.RemoteAddr(),
|
||||
msg.Id,
|
||||
msg.Question,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// check if we have finished all queries and want to recycle conn
|
||||
if recycleConn {
|
||||
tr.Lock()
|
||||
activeQueries := len(tr.inFlightQueries)
|
||||
tr.Unlock()
|
||||
if activeQueries == 0 {
|
||||
log.Debugf("resolver: recycling conn to %s (%s)", tr.resolver.Name, tr.dnsConnection.RemoteAddr())
|
||||
continue connMgmt
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
func (tr *TCPResolver) submitQuery(_ context.Context, q *Query) *InFlightQuery {
|
||||
// make sure client is started
|
||||
tr.startClient()
|
||||
|
||||
// create msg
|
||||
msg := &dns.Msg{}
|
||||
msg.SetQuestion(q.FQDN, uint16(q.QType))
|
||||
|
@ -348,17 +103,21 @@ func (tr *TCPResolver) submitQuery(_ context.Context, q *Query) *InFlightQuery {
|
|||
ConnInstanceID: atomic.LoadUint32(tr.connInstanceID),
|
||||
}
|
||||
tr.Lock()
|
||||
// check for existing query
|
||||
for i := 0; i < 10; i++ { // don't try forever
|
||||
_, exists := tr.inFlightQueries[msg.Id]
|
||||
if !exists {
|
||||
break // we are unique, yay!
|
||||
}
|
||||
msg.Id = dns.Id() // regenerate ID
|
||||
}
|
||||
// add query to in flight registry
|
||||
tr.inFlightQueries[msg.Id] = inFlight
|
||||
tr.Unlock()
|
||||
|
||||
// submit msg for writing
|
||||
tr.queries <- msg
|
||||
|
||||
// make sure client is started
|
||||
if tr.clientStarted.SetToIf(false, true) {
|
||||
module.StartServiceWorker("dns client", 10*time.Millisecond, tr.client)
|
||||
}
|
||||
|
||||
return inFlight
|
||||
}
|
||||
|
||||
|
@ -386,3 +145,295 @@ func (tr *TCPResolver) Query(ctx context.Context, q *Query) (*RRCache, error) {
|
|||
|
||||
return inFlight.MakeCacheRecord(reply), nil
|
||||
}
|
||||
|
||||
type tcpResolverConnMgr struct {
|
||||
tr *TCPResolver
|
||||
workerCtx context.Context
|
||||
conn *dns.Conn
|
||||
connCtx context.Context
|
||||
cancelConnCtx func()
|
||||
connTimer *time.Timer
|
||||
connClosing *abool.AtomicBool
|
||||
responses chan *dns.Msg
|
||||
failCnt int
|
||||
}
|
||||
|
||||
func (tr *TCPResolver) startClient() {
|
||||
if tr.clientStarted.SetToIf(false, true) {
|
||||
mgr := &tcpResolverConnMgr{
|
||||
tr: tr,
|
||||
connTimer: time.NewTimer(tr.clientTTL),
|
||||
connClosing: abool.New(),
|
||||
responses: make(chan *dns.Msg, 100),
|
||||
}
|
||||
module.StartServiceWorker("dns client", 10*time.Millisecond, mgr.run)
|
||||
}
|
||||
}
|
||||
|
||||
func (mgr *tcpResolverConnMgr) run(workerCtx context.Context) error {
|
||||
mgr.workerCtx = workerCtx
|
||||
|
||||
// connection lifecycle loop
|
||||
for {
|
||||
// check if we are failing
|
||||
if mgr.failCnt >= FailThreshold || mgr.tr.IsFailing() {
|
||||
mgr.shutdown()
|
||||
return nil
|
||||
}
|
||||
|
||||
// clean up anything that is left over
|
||||
mgr.cleanupConnection()
|
||||
|
||||
// wait for work before creating connection
|
||||
proceed := mgr.waitForWork()
|
||||
if !proceed {
|
||||
mgr.shutdown()
|
||||
return nil
|
||||
}
|
||||
|
||||
// create connection
|
||||
success := mgr.establishConnection()
|
||||
if !success {
|
||||
mgr.failCnt++
|
||||
continue
|
||||
}
|
||||
|
||||
// hint network environment at successful connection
|
||||
netenv.ReportSuccessfulConnection()
|
||||
|
||||
// handle queries
|
||||
proceed = mgr.queryHandler()
|
||||
if !proceed {
|
||||
mgr.shutdown()
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (mgr *tcpResolverConnMgr) cleanupConnection() {
|
||||
// cleanup old connection
|
||||
if mgr.conn != nil {
|
||||
mgr.connClosing.Set() // silence connection errors
|
||||
_ = mgr.conn.Close()
|
||||
if mgr.cancelConnCtx != nil {
|
||||
mgr.cancelConnCtx()
|
||||
}
|
||||
|
||||
// delete old connection
|
||||
mgr.conn = nil
|
||||
|
||||
// increase instance counter
|
||||
atomic.AddUint32(mgr.tr.connInstanceID, 1)
|
||||
}
|
||||
}
|
||||
|
||||
func (mgr *tcpResolverConnMgr) shutdown() {
|
||||
mgr.cleanupConnection()
|
||||
|
||||
// reply to all waiting queries
|
||||
mgr.tr.Lock()
|
||||
for id, inFlight := range mgr.tr.inFlightQueries {
|
||||
close(inFlight.Response)
|
||||
delete(mgr.tr.inFlightQueries, id)
|
||||
}
|
||||
mgr.tr.clientStarted.UnSet() // in lock to guarantee to set before submitQuery proceeds
|
||||
atomic.AddUint32(mgr.tr.connInstanceID, 1) // increase instance counter
|
||||
mgr.tr.Unlock()
|
||||
|
||||
// hint network environment at failed connection
|
||||
if mgr.failCnt >= FailThreshold {
|
||||
netenv.ReportFailedConnection()
|
||||
}
|
||||
}
|
||||
|
||||
func (mgr *tcpResolverConnMgr) waitForWork() (proceed bool) {
|
||||
// wait until there is something to do
|
||||
mgr.tr.Lock()
|
||||
waiting := len(mgr.tr.inFlightQueries)
|
||||
mgr.tr.Unlock()
|
||||
if waiting > 0 {
|
||||
// queue abandoned queries
|
||||
ignoreBefore := time.Now().Add(-ignoreQueriesAfter)
|
||||
currentConnInstanceID := atomic.LoadUint32(mgr.tr.connInstanceID)
|
||||
mgr.tr.Lock()
|
||||
for id, inFlight := range mgr.tr.inFlightQueries {
|
||||
if inFlight.Started.Before(ignoreBefore) {
|
||||
// remove old queries
|
||||
close(inFlight.Response)
|
||||
delete(mgr.tr.inFlightQueries, id)
|
||||
} else if inFlight.ConnInstanceID != currentConnInstanceID {
|
||||
inFlight.ConnInstanceID = currentConnInstanceID
|
||||
// re-inject queries that died with a previously failed connection
|
||||
select {
|
||||
case mgr.tr.queries <- inFlight.Msg:
|
||||
default:
|
||||
log.Warningf("resolver: failed to re-inject abandoned query to %s", mgr.tr.resolver.Name)
|
||||
}
|
||||
}
|
||||
// in-flight queries that match the connection instance ID are not changed. They are already in the queue.
|
||||
}
|
||||
mgr.tr.Unlock()
|
||||
} else {
|
||||
// wait for first query
|
||||
select {
|
||||
case <-mgr.workerCtx.Done():
|
||||
return false
|
||||
case msg := <-mgr.tr.queries:
|
||||
// re-insert query, we will handle it later
|
||||
select {
|
||||
case mgr.tr.queries <- msg:
|
||||
default:
|
||||
log.Warningf("resolver: failed to re-inject waking query to %s", mgr.tr.resolver.Name)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (mgr *tcpResolverConnMgr) establishConnection() (success bool) {
|
||||
// create connection
|
||||
mgr.connCtx, mgr.cancelConnCtx = context.WithCancel(mgr.workerCtx)
|
||||
mgr.connClosing = abool.New()
|
||||
// refresh dialer to set an authenticated local address
|
||||
// TODO: lock dnsClient (only manager should run at any time, so this should not be an issue)
|
||||
mgr.tr.dnsClient.Dialer = &net.Dialer{
|
||||
LocalAddr: getLocalAddr("tcp"),
|
||||
Timeout: defaultConnectTimeout,
|
||||
KeepAlive: defaultClientTTL,
|
||||
}
|
||||
// connect
|
||||
c, err := mgr.tr.dnsClient.Dial(mgr.tr.resolver.ServerAddress)
|
||||
if err != nil {
|
||||
log.Debugf("resolver: failed to connect to %s (%s)", mgr.tr.resolver.Name, mgr.tr.resolver.ServerAddress)
|
||||
return false
|
||||
}
|
||||
mgr.conn = c
|
||||
log.Debugf("resolver: connected to %s (%s)", mgr.tr.resolver.Name, mgr.conn.RemoteAddr())
|
||||
|
||||
// reset timer
|
||||
mgr.connTimer.Stop()
|
||||
select {
|
||||
case <-mgr.connTimer.C: // try to empty the timer
|
||||
default:
|
||||
}
|
||||
mgr.connTimer.Reset(mgr.tr.clientTTL)
|
||||
|
||||
// start reader
|
||||
module.StartServiceWorker("dns client reader", 10*time.Millisecond, mgr.msgReader)
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (mgr *tcpResolverConnMgr) queryHandler() (proceed bool) { //nolint:gocognit
|
||||
var readyToRecycle bool
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-mgr.workerCtx.Done():
|
||||
// module shutdown
|
||||
return false
|
||||
|
||||
case <-mgr.connCtx.Done():
|
||||
// connection error
|
||||
return true
|
||||
|
||||
case <-mgr.connTimer.C:
|
||||
// connection TTL reached, rebuild connection
|
||||
// but handle all in flight queries first
|
||||
readyToRecycle = true
|
||||
// trigger check
|
||||
select {
|
||||
case mgr.responses <- nil:
|
||||
default:
|
||||
// queue is full, check will be triggered anyway
|
||||
}
|
||||
|
||||
case msg := <-mgr.tr.queries:
|
||||
// write query
|
||||
_ = mgr.conn.SetWriteDeadline(time.Now().Add(mgr.tr.dnsClient.WriteTimeout))
|
||||
err := mgr.conn.WriteMsg(msg)
|
||||
if err != nil {
|
||||
if mgr.connClosing.SetToIf(false, true) {
|
||||
mgr.cancelConnCtx()
|
||||
log.Warningf("resolver: write error to %s (%s): %s", mgr.tr.resolver.Name, mgr.conn.RemoteAddr(), err)
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
case msg := <-mgr.responses:
|
||||
if msg != nil { // nil messages only trigger the recycle check
|
||||
// handle query from resolver
|
||||
mgr.tr.Lock()
|
||||
inFlight, ok := mgr.tr.inFlightQueries[msg.Id]
|
||||
if ok {
|
||||
delete(mgr.tr.inFlightQueries, msg.Id)
|
||||
}
|
||||
mgr.tr.Unlock()
|
||||
|
||||
if ok {
|
||||
select {
|
||||
case inFlight.Response <- msg:
|
||||
mgr.failCnt = 0 // reset fail counter
|
||||
// responded!
|
||||
default:
|
||||
// save to cache, if enabled
|
||||
if !inFlight.Query.NoCaching {
|
||||
// persist to database
|
||||
rrCache := inFlight.MakeCacheRecord(msg)
|
||||
rrCache.Clean(600)
|
||||
err := rrCache.Save()
|
||||
if err != nil {
|
||||
log.Warningf(
|
||||
"resolver: failed to cache RR for %s%s: %s",
|
||||
inFlight.Query.FQDN,
|
||||
inFlight.Query.QType.String(),
|
||||
err,
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
log.Debugf(
|
||||
"resolver: received possibly unsolicited reply from %s (%s): txid=%d q=%+v",
|
||||
mgr.tr.resolver.Name,
|
||||
mgr.conn.RemoteAddr(),
|
||||
msg.Id,
|
||||
msg.Question,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
if readyToRecycle {
|
||||
// check to see if we can recycle the connection
|
||||
mgr.tr.Lock()
|
||||
activeQueries := len(mgr.tr.inFlightQueries)
|
||||
mgr.tr.Unlock()
|
||||
if activeQueries == 0 {
|
||||
log.Debugf("resolver: recycling conn to %s (%s)", mgr.tr.resolver.Name, mgr.conn.RemoteAddr())
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (mgr *tcpResolverConnMgr) msgReader(workerCtx context.Context) error {
|
||||
// copy values from manager
|
||||
conn := mgr.conn
|
||||
cancelConnCtx := mgr.cancelConnCtx
|
||||
connClosing := mgr.connClosing
|
||||
|
||||
for {
|
||||
msg, err := conn.ReadMsg()
|
||||
if err != nil {
|
||||
if connClosing.SetToIf(false, true) {
|
||||
cancelConnCtx()
|
||||
log.Warningf("resolver: read error from %s (%s): %s", mgr.tr.resolver.Name, mgr.conn.RemoteAddr(), err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
mgr.responses <- msg
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,7 +21,7 @@ func init() {
|
|||
flag.StringVar(
|
||||
&testResolver,
|
||||
"resolver",
|
||||
"dot://9.9.9.9:853?verify=dns.quad9.net&name=Quad9&blockedif=empty",
|
||||
"dot://1.1.1.2:853?verify=cloudflare-dns.com&name=Cloudflare&blockedif=zeroip",
|
||||
"set custom resolver for testing",
|
||||
)
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue