Skip to content

Commit

Permalink
Merge pull request #151 from ricarkol/containers-flake8-clean
Browse files Browse the repository at this point in the history
Containers flake8 cleanup
  • Loading branch information
ricarkol authored Sep 10, 2016
2 parents d651910 + 1677269 commit daf54fb
Show file tree
Hide file tree
Showing 7 changed files with 544 additions and 75 deletions.
44 changes: 41 additions & 3 deletions crawler/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,59 @@
import os
import logging
import defaults
import psutil
import misc
import namespace

logger = logging.getLogger('crawlutils')


def list_raw_containers(user_list='ALL'):
"""
A running container is defined as a group of processes with the
`pid` namespace different to the `init` process `pid` namespace.
"""
init_ns = namespace.get_pid_namespace(1)
for p in psutil.process_iter():
pid = (p.pid() if hasattr(p.pid, '__call__') else p.pid)
if pid == 1 or pid == '1':

# don't confuse the init process as a container

continue
if user_list not in ['ALL', 'all', 'All']:
if str(pid) not in user_list:

# skip containers not in the list

continue
if misc.process_is_crawler(pid):

# don't confuse the crawler process with a container

continue
curr_ns = namespace.get_pid_namespace(pid)
if not curr_ns:

# invalid container

continue
if curr_ns == init_ns:
continue
yield Container(pid, curr_ns)


class Container(object):

"""
This class abstracts a running Linux container.
A running container is defined as a process subtree with the `pid`
namespace different to the `init` process `pid` namespace.
"""

def __init__(
self,
pid,
container_opts={},
process_namespace=None,
):
self.pid = str(pid)
self.short_id = str(hash(pid))
Expand All @@ -30,6 +66,8 @@ def __init__(
self.root_fs = None
self.log_prefix = None
self.log_file_list = None
self.process_namespace = (process_namespace or
namespace.get_pid_namespace(pid))

# XXX(kollerr): when running in alchemy environment, non-alchemy
# containres should be ignored
Expand Down
76 changes: 16 additions & 60 deletions crawler/containers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,10 @@

# External dependencies that must be pip install'ed separately

import psutil

import defaults
from container import Container
import container
import misc
import namespace
from dockercontainer import list_docker_containers
from crawler_exceptions import ContainerInvalidEnvironment

logger = logging.getLogger('crawlutils')

Expand All @@ -25,59 +21,19 @@ def list_all_containers(user_list='ALL',
A running container is defined as a process subtree with the `pid`
namespace different to the `init` process `pid` namespace.
"""
all_docker_containers = list_docker_containers(container_opts)

if user_list in ['ALL', 'all', 'All']:
init_ns = namespace.get_pid_namespace(1)

visited_ns = set() # visited PID namespaces

# Start with all docker containers

for container in all_docker_containers:
curr_ns = namespace.get_pid_namespace(container.pid)
if not curr_ns:
continue
if curr_ns not in visited_ns and curr_ns != init_ns:
visited_ns.add(curr_ns)
try:
yield container
except ContainerInvalidEnvironment as e:
logger.exception(e)

# Continue with all other containers not known to docker

for p in psutil.process_iter():
pid = (p.pid() if hasattr(p.pid, '__call__') else p.pid)
if pid == 1 or pid == '1':

# don't confuse the init process as a container

continue
if misc.process_is_crawler(pid):

# don't confuse the crawler process with a container

continue
curr_ns = namespace.get_pid_namespace(pid)
if not curr_ns:

# invalid container

continue
if curr_ns not in visited_ns and curr_ns != init_ns:
visited_ns.add(curr_ns)
yield Container(pid)
else:
visited_ns = set() # visited PID namespaces

# User provided a list of containers
for _container in list_docker_containers(container_opts, user_list):
curr_ns = _container.process_namespace
if curr_ns not in visited_ns:
visited_ns.add(curr_ns)
yield _container

user_containers = user_list.split(',')
for container in all_docker_containers:
short_id_match = container.short_id in user_containers
long_id_match = container.long_id in user_containers
if short_id_match or long_id_match:
yield container
for _container in container.list_raw_containers(user_list):
curr_ns = _container.process_namespace
if curr_ns not in visited_ns:
visited_ns.add(curr_ns)
yield _container


def get_filtered_list_of_containers(
Expand Down Expand Up @@ -108,7 +64,7 @@ def get_filtered_list_of_containers(

filtered_list = []
containers_list = list_all_containers(user_list, container_opts)
for container in containers_list:
for _container in containers_list:

"""
There are docker and non-docker containers in this list. An example of
Expand All @@ -118,17 +74,17 @@ def get_filtered_list_of_containers(
"""

if (environment != defaults.DEFAULT_ENVIRONMENT and
not container.is_docker_container()):
not _container.is_docker_container()):
continue

"""
The partition strategy is to split all the containers equally by
process pid. We do it by hashing the long_id of the container.
"""

_hash = container.long_id
_hash = _container.long_id
num = int(_hash, 16) % int(num_processes)
if num == process_id:
filtered_list.append(container)
filtered_list.append(_container)

return filtered_list
1 change: 1 addition & 0 deletions crawler/crawler_exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class ContainerWithoutCgroups(Exception):

pass


class DockerutilsException(Exception):

"""Exception from the dockerutils module."""
Expand Down
14 changes: 13 additions & 1 deletion crawler/dockercontainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@
DockerutilsException,
ContainerWithoutCgroups)
from requests.exceptions import HTTPError
import namespace

logger = logging.getLogger('crawlutils')


def list_docker_containers(container_opts={}):
def list_docker_containers(container_opts={}, user_list='ALL'):
"""
Get the list of running Docker containers, as `DockerContainer` objects.
Expand All @@ -34,6 +35,13 @@ def list_docker_containers(container_opts={}):
"""
for inspect in exec_dockerps():
long_id = inspect['Id']

if user_list not in ['ALL', 'all', 'All']:
user_ctrs = [cid[:12] for cid in user_list.split(',')]
short_id = long_id[:12]
if short_id not in user_ctrs:
continue

try:
c = DockerContainer(long_id, inspect, container_opts)
if c.namespace:
Expand Down Expand Up @@ -73,6 +81,7 @@ def __init__(
long_id,
inspect=None,
container_opts={},
process_namespace=None,
):

# Some quick sanity checks
Expand Down Expand Up @@ -105,6 +114,9 @@ def __init__(
self.volumes = inspect.get('Volumes')
self.inspect = inspect

self.process_namespace = (process_namespace or
namespace.get_pid_namespace(self.pid))

# This short ID is mainly used for logging purposes
self.short_id = long_id[:12]

Expand Down
7 changes: 4 additions & 3 deletions crawler/emitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
# Kafka logs too much
logging.getLogger('kafka').addHandler(NullHandler())


def kafka_send(kurl, temp_fpath, format, topic, queue=None):
try:
kafka_python_client = kafka_python.KafkaClient(kurl)
Expand Down Expand Up @@ -59,6 +60,7 @@ def kafka_send(kurl, temp_fpath, format, topic, queue=None):
finally:
queue and queue.close()


class Emitter:

"""Class that abstracts the outputs supported by the crawler, like
Expand Down Expand Up @@ -321,7 +323,6 @@ def _publish_to_kafka_no_retries(self, url):
kurl, self.temp_fpath, self.format, topic, queue))
child_process.start()
except OSError:
#queue.close() # closing queue in finally clause
raise

try:
Expand All @@ -335,7 +336,6 @@ def _publish_to_kafka_no_retries(self, url):
if child_process.is_alive():
errmsg = ('Timed out waiting for process %d to exit.' %
child_process.pid)
#queue.close() # closing queue in finally clause
os.kill(child_process.pid, 9)
logger.error(errmsg)
raise EmitterEmitTimeout(errmsg)
Expand Down Expand Up @@ -427,7 +427,8 @@ def __exit__(
try:
self._publish(url)
except EmitterEmitTimeout as exc:
logger.warning("Failed to emit due to timout, continuing anyway...")
logger.warning('Failed to emit due to timeout, '
'continuing anyway...')
finally:
if os.path.exists(self.temp_fpath):
os.remove(self.temp_fpath)
Expand Down
Loading

0 comments on commit daf54fb

Please sign in to comment.