Skip to content

Commit

Permalink
Merge pull request #807 from deniszh/backport/1.1.x/pr-800_pr-801_pr-…
Browse files Browse the repository at this point in the history
…801_pr-801_pr-801_pr-801_pr-801_pr-801_pr-801_pr-802_pr-801

[1.1.x] Stop accept() when we reach MAX_RECEIVER_CONNECTIONS | Add DESTINATIONS_POOL_REPLICAS | client: deal correctly with cases where destinations are down | client: fix USE_RATIO_RESET with DESTINATION_POO
  • Loading branch information
deniszh authored Sep 2, 2018
2 parents 5363f55 + f143cfe commit d94ab87
Show file tree
Hide file tree
Showing 9 changed files with 193 additions and 28 deletions.
9 changes: 9 additions & 0 deletions conf/carbon.conf.example
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,15 @@ DESTINATIONS = 127.0.0.1:2004
# DESTINATION_TRANSPORT = none
# DESTINATION_SSL_CA=/path/to/private-ca.crt

# This allows to have multiple connections per destinations, this will
# pool all the replicas of a single host in the same queue and distribute
# points accross these replicas instead of replicating them.
# The following example will balance the load between :0 and :1.
## DESTINATIONS = foo:2001:0, foo:2001:1
## RELAY_METHOD = rules
# Note: this is currently incompatible with USE_RATIO_RESET which gets
# disabled if this option is enabled.
# DESTINATIONS_POOL_REPLICAS = False

# When using consistent hashing it sometime makes sense to make
# the ring dynamic when you don't want to loose points when a
Expand Down
87 changes: 66 additions & 21 deletions lib/carbon/client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from collections import deque
from collections import deque, defaultdict
from time import time
from six import with_metaclass

Expand All @@ -7,11 +7,13 @@
from twisted.internet.defer import Deferred, DeferredList
from twisted.internet.protocol import ReconnectingClientFactory
from twisted.protocols.basic import LineOnlyReceiver, Int32StringReceiver

from carbon.conf import settings
from carbon.util import pickle
from carbon import instrumentation, log, pipeline, state
from carbon.util import PluginRegistrar
from carbon.util import enableTcpKeepAlive
from carbon.resolver import setUpRandomResolver
from carbon import instrumentation, log, pipeline, state

try:
from OpenSSL import SSL
Expand Down Expand Up @@ -121,11 +123,10 @@ def sendQueued(self):
if not self.factory.hasQueuedDatapoints():
return

if settings.USE_RATIO_RESET is True:
if not self.connectionQualityMonitor():
self.resetConnectionForQualityReasons("Sent: {0}, Received: {1}".format(
instrumentation.prior_stats.get(self.sent, 0),
instrumentation.prior_stats.get('metricsReceived', 0)))
if not self.connectionQualityMonitor():
self.resetConnectionForQualityReasons("Sent: {0}, Received: {1}".format(
instrumentation.prior_stats.get(self.sent, 0),
instrumentation.prior_stats.get('metricsReceived', 0)))

self.sendDatapointsNow(self.factory.takeSomeFromQueue())
if (self.factory.queueFull.called and queueSize < SEND_QUEUE_LOW_WATERMARK):
Expand All @@ -148,8 +149,16 @@ def connectionQualityMonitor(self):
False means that quality is bad
"""
if not settings.USE_RATIO_RESET:
return True

if settings.DESTINATION_POOL_REPLICAS:
received = self.factory.attemptedRelays
else:
received = 'metricsReceived'

destination_sent = float(instrumentation.prior_stats.get(self.sent, 0))
total_received = float(instrumentation.prior_stats.get('metricsReceived', 0))
total_received = float(instrumentation.prior_stats.get(received, 0))
instrumentation.increment(self.slowConnectionReset, 0)
if total_received < settings.MIN_RESET_STAT_FLOW:
return True
Expand Down Expand Up @@ -216,9 +225,9 @@ def __init__(self, destination, router):
self.connectedProtocol = None
self.queueEmpty = Deferred()
self.queueFull = Deferred()
self.queueFull.addCallback(self.queueFullCallback)
self.queueFull.addCallbacks(self.queueFullCallback, log.err)
self.queueHasSpace = Deferred()
self.queueHasSpace.addCallback(self.queueSpaceCallback)
self.queueHasSpace.addCallbacks(self.queueSpaceCallback, log.err)
# Args: {'connector': connector, 'reason': reason}
self.connectFailed = Deferred()
# Args: {'connector': connector, 'reason': reason}
Expand Down Expand Up @@ -253,10 +262,10 @@ def queueSpaceCallback(self, result):
if self.queueFull.called:
log.clients('%s send queue has space available' % self.connectedProtocol)
self.queueFull = Deferred()
self.queueFull.addCallback(self.queueFullCallback)
self.queueFull.addCallbacks(self.queueFullCallback, log.err)
state.events.cacheSpaceAvailable()
self.queueHasSpace = Deferred()
self.queueHasSpace.addCallback(self.queueSpaceCallback)
self.queueHasSpace.addCallbacks(self.queueSpaceCallback, log.err)

def buildProtocol(self, addr):
self.connectedProtocol = self.clientProtocol()
Expand All @@ -265,26 +274,28 @@ def buildProtocol(self, addr):

def startConnecting(self): # calling this startFactory yields recursion problems
self.started = True

if settings['DESTINATION_TRANSPORT'] == "ssl":
if not SSL or not ssl:
print("SSL destination transport request, but no Python OpenSSL available.")
raise SystemExit(1)
authority = None
if settings['DESTINATION_SSL_CA']:
try:
f = open(settings['DESTINATION_SSL_CA'])
with open(settings['DESTINATION_SSL_CA']) as f:
authority = ssl.Certificate.loadPEM(f.read())
except IOError:
print("Failed to read CA chain: %s" % settings['DESTINATION_SSL_CA'])
raise SystemExit(1)
# Twisted 14 introduced this function, it might not be around on older installs.
if hasattr(ssl, "optionsForClientTLS"):
client = ssl.optionsForClientTLS(self.host.decode('utf-8'), authority)
from six import u
client = ssl.optionsForClientTLS(u(self.host), authority)
else:
client = CAReplaceClientContextFactory(settings['DESTINATION_SSL_CA'])
self.connector = reactor.connectSSL(self.host, self.port, self, client)
else:
self.connector = reactor.connectTCP(self.host, self.port, self)
self.connector = reactor.connectTCP(self.host, self.port, self)

def stopConnecting(self):
self.started = False
Expand Down Expand Up @@ -429,7 +440,7 @@ def destinationDown(self, destination):
self.queue.clear()

def disconnect(self):
self.queueEmpty.addCallback(lambda result: self.stopConnecting())
self.queueEmpty.addCallbacks(lambda result: self.stopConnecting(), log.err)
readyToStop = DeferredList(
[self.connectionLost, self.connectFailed],
fireOnOneCallback=True,
Expand Down Expand Up @@ -513,8 +524,16 @@ def reinjectDatapoints(self):

class CarbonClientManager(Service):
def __init__(self, router):
if settings.DESTINATION_POOL_REPLICAS:
# If we decide to open multiple TCP connection to a replica, we probably
# want to try to also load-balance accross hosts. In this case we need
# to make sure rfc3484 doesn't get in the way.
setUpRandomResolver(reactor)

self.router = router
self.client_factories = {} # { destination : CarbonClientFactory() }
# { destination[0:2]: set(CarbonClientFactory()) }
self.pooled_factories = defaultdict(set)

# This fake factory will be used as a buffer when we did not manage
# to connect to any destination.
Expand Down Expand Up @@ -558,6 +577,7 @@ def startClient(self, destination):

factory = self.createFactory(destination)
self.client_factories[destination] = factory
self.pooled_factories[destination[0:2]].add(factory)

connectAttempted = DeferredList(
[factory.connectionMade, factory.connectFailed],
Expand All @@ -575,11 +595,14 @@ def stopClient(self, destination):

self.router.removeDestination(destination)
stopCompleted = factory.disconnect()
stopCompleted.addCallback(lambda result: self.disconnectClient(destination))
stopCompleted.addCallbacks(
lambda result: self.disconnectClient(destination), log.err
)
return stopCompleted

def disconnectClient(self, destination):
factory = self.client_factories.pop(destination)
self.pooled_factories[destination[0:2]].remove(factory)
c = factory.connector
if c and c.state == 'connecting' and not factory.hasQueuedDatapoints():
c.stopConnecting()
Expand All @@ -600,13 +623,35 @@ def getDestinations(self, metric):
return [None]
return destinations

def getFactories(self, metric):
destinations = self.getDestinations(metric)
factories = set()

if not settings.DESTINATION_POOL_REPLICAS:
# Simple case, with only one replica per destination.
for d in destinations:
# If we can't find it, we add to the 'fake' factory / buffer.
factories.add(self.client_factories.get(d))
else:
# Here we might have multiple replicas per destination.
for d in destinations:
if d is None:
# d == None means there are no destinations currently available, so
# we just put the data into our fake factory / buffer.
factories.add(self.client_factories[None])
else:
# Else we take the replica with the smallest queue size.
key = d[0:2] # Take only host:port, not instance.
factories.add(min(self.pooled_factories[key], key=lambda f: f.queueSize))
return factories

def sendDatapoint(self, metric, datapoint):
for destination in self.getDestinations(metric):
self.client_factories[destination].sendDatapoint(metric, datapoint)
for factory in self.getFactories(metric):
factory.sendDatapoint(metric, datapoint)

def sendHighPriorityDatapoint(self, metric, datapoint):
for destination in self.getDestinations(metric):
self.client_factories[destination].sendHighPriorityDatapoint(metric, datapoint)
for factory in self.getFactories(metric):
factory.sendHighPriorityDatapoint(metric, datapoint)

def __str__(self):
return "<%s[%x]>" % (self.__class__.__name__, id(self))
Expand Down
1 change: 1 addition & 0 deletions lib/carbon/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
DESTINATION_PROTOCOL="pickle",
DESTINATION_TRANSPORT="none",
DESTINATION_SSL_CA=None,
DESTINATION_POOL_REPLICAS=False,
USE_FLOW_CONTROL=True,
USE_INSECURE_UNPICKLER=False,
USE_WHITELIST=False,
Expand Down
2 changes: 1 addition & 1 deletion lib/carbon/instrumentation.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ def recordMetrics():
# Preserve the count of sent metrics so that the ratio of
# received : sent can be checked per-relay to determine the
# health of the destination.
if stat_name.endswith('.sent'):
if stat_name.endswith('.sent') or stat_name.endswith('.attemptedRelays'):
myPriorStats[stat_name] = stat_value

# common metrics
Expand Down
37 changes: 31 additions & 6 deletions lib/carbon/protocols.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,42 @@
from carbon.util import enableTcpKeepAlive


def checkIfAcceptingConnections():
clients = len(state.connectedMetricReceiverProtocols)
max_clients = settings.MAX_RECEIVER_CONNECTIONS

if clients < max_clients:
for port in state.listeningPorts:
if port.paused:
log.listener(
"Resuming %s (%d/%d connections)" % (port, clients, max_clients))
port.resumeProducing()
port.paused = False
else:
for port in state.listeningPorts:
if not port.paused:
log.listener(
"Pausing %s (%d/%d connections)" % (port, clients, max_clients))
port.pauseProducing()
port.paused = True


class CarbonReceiverFactory(ServerFactory):

def buildProtocol(self, addr):
from carbon.conf import settings
clients = len(state.connectedMetricReceiverProtocols)
max_clients = settings.MAX_RECEIVER_CONNECTIONS

# Don't establish the connection if we have reached the limit.
if len(state.connectedMetricReceiverProtocols) < settings.MAX_RECEIVER_CONNECTIONS:
if clients < max_clients:
return ServerFactory.buildProtocol(self, addr)
else:
return None


class CarbonService(service.Service):
"""creates our own socket to support SO_REUSEPORT
to be removed when twisted supports it natively
see https://github.com/twisted/twisted/pull/759
"""Create our own socket to support SO_REUSEPORT.
To be removed when twisted supports it natively
See: https://github.com/twisted/twisted/pull/759.
"""
factory = None
protocol = None
Expand Down Expand Up @@ -61,6 +82,8 @@ def startService(self):
carbon_sock.listen(tmp_port.backlog)
self._port = reactor.adoptStreamPort(
carbon_sock.fileno(), socket.AF_INET, self.factory)
state.listeningPorts.append(self._port)
self._port.paused = False
carbon_sock.close()

def stopService(self):
Expand Down Expand Up @@ -107,6 +130,7 @@ def connectionMade(self):
self.pauseReceiving()

state.connectedMetricReceiverProtocols.add(self)
checkIfAcceptingConnections()
if settings.USE_FLOW_CONTROL:
events.pauseReceivingMetrics.addHandler(self.pauseReceiving)
events.resumeReceivingMetrics.addHandler(self.resumeReceiving)
Expand Down Expand Up @@ -135,6 +159,7 @@ def connectionLost(self, reason):
"%s connection with %s lost: %s" % (self.__class__.__name__, self.peerName, reason.value))

state.connectedMetricReceiverProtocols.remove(self)
checkIfAcceptingConnections()
if settings.USE_FLOW_CONTROL:
events.pauseReceivingMetrics.removeHandler(self.pauseReceiving)
events.resumeReceivingMetrics.removeHandler(self.resumeReceiving)
Expand Down
79 changes: 79 additions & 0 deletions lib/carbon/resolver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import random

from zope.interface import implementer

from twisted.internet._resolver import GAIResolver
from twisted.internet.defer import Deferred
from twisted.internet.address import IPv4Address
from twisted.internet.interfaces import IResolverSimple, IResolutionReceiver
from twisted.internet.error import DNSLookupError


# Inspired from /twisted/internet/_resolver.py
@implementer(IResolutionReceiver)
class RandomWins(object):
"""
An L{IResolutionReceiver} which fires a L{Deferred} with a random result.
"""

def __init__(self, deferred):
"""
@param deferred: The L{Deferred} to fire with one resolution
result arrives.
"""
self._deferred = deferred
self._results = []

def resolutionBegan(self, resolution):
"""
See L{IResolutionReceiver.resolutionBegan}
@param resolution: See L{IResolutionReceiver.resolutionBegan}
"""
self._resolution = resolution

def addressResolved(self, address):
"""
See L{IResolutionReceiver.addressResolved}
@param address: See L{IResolutionReceiver.addressResolved}
"""
self._results.append(address.host)

def resolutionComplete(self):
"""
See L{IResolutionReceiver.resolutionComplete}
"""
if self._results:
random.shuffle(self._results)
self._deferred.callback(self._results[0])
else:
self._deferred.errback(DNSLookupError(self._resolution.name))


@implementer(IResolverSimple)
class ComplexResolverSimplifier(object):
"""
A converter from L{IHostnameResolver} to L{IResolverSimple}
"""
def __init__(self, nameResolver):
"""
Create a L{ComplexResolverSimplifier} with an L{IHostnameResolver}.
@param nameResolver: The L{IHostnameResolver} to use.
"""
self._nameResolver = nameResolver

def getHostByName(self, name, timeouts=()):
"""
See L{IResolverSimple.getHostByName}
@param name: see L{IResolverSimple.getHostByName}
@param timeouts: see L{IResolverSimple.getHostByName}
@return: see L{IResolverSimple.getHostByName}
"""
result = Deferred()
self._nameResolver.resolveHostName(RandomWins(result), name, 0,
[IPv4Address])
return result


def setUpRandomResolver(reactor):
resolver = GAIResolver(reactor, reactor.getThreadPool)
reactor.installResolver(ComplexResolverSimplifier(resolver))
3 changes: 3 additions & 0 deletions lib/carbon/routers.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,9 @@ def remove_node(self, node):
self._update_nodes()

def get_nodes(self, key):
if not self.nodes:
return

seed = self._hash(key) % len(self.nodes)

for n in xrange(seed, seed + len(self.nodes)):
Expand Down
Loading

0 comments on commit d94ab87

Please sign in to comment.