Skip to content

Commit

Permalink
[connection] Made test mixins reusable (for extensions)
Browse files Browse the repository at this point in the history
  • Loading branch information
nemesifier committed Apr 7, 2019
1 parent 445611f commit b055fc6
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 79 deletions.
83 changes: 83 additions & 0 deletions openwisp_controller/connection/tests/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import os

from django.conf import settings
from mockssh import Server as SshServer
from openwisp_controller.config.models import Config, Device
from openwisp_controller.config.tests import CreateConfigTemplateMixin

from openwisp_users.tests.utils import TestOrganizationMixin

from .. import settings as app_settings
from ..models import Credentials, DeviceConnection, DeviceIp


class CreateConnectionsMixin(CreateConfigTemplateMixin, TestOrganizationMixin):
device_model = Device
config_model = Config

def _create_credentials(self, **kwargs):
opts = dict(name='Test credentials',
connector=app_settings.CONNECTORS[0][0],
params={'username': 'root',
'password': 'password',
'port': 22})
opts.update(kwargs)
if 'organization' not in opts:
opts['organization'] = self._create_org()
c = Credentials(**opts)
c.full_clean()
c.save()
return c

def _create_credentials_with_key(self, username='root', port=22, **kwargs):
opts = dict(name='Test SSH Key',
params={'username': username,
'key': self._SSH_PRIVATE_KEY,
'port': port})
return self._create_credentials(**opts)

def _create_device_connection(self, **kwargs):
opts = dict(enabled=True,
params={})
opts.update(kwargs)
if 'credentials' not in opts:
opts['credentials'] = self._create_credentials()
org = opts['credentials'].organization
if 'device' not in opts:
opts['device'] = self._create_device(organization=org)
self._create_config(device=opts['device'])
dc = DeviceConnection(**opts)
dc.full_clean()
dc.save()
return dc

def _create_device_ip(self, **kwargs):
opts = dict(address='10.40.0.1',
priority=1)
opts.update(kwargs)
if 'device' not in opts:
dc = self._create_device_connection()
opts['device'] = dc.device
ip = DeviceIp(**opts)
ip.full_clean()
ip.save()
return ip


class SshServerMixin(object):
_TEST_RSA_KEY_PATH = os.path.join(settings.BASE_DIR, 'test-key.rsa')
_SSH_PRIVATE_KEY = None

@classmethod
def setUpClass(cls):
with open(cls._TEST_RSA_KEY_PATH, 'r') as f:
cls._SSH_PRIVATE_KEY = f.read()
cls.ssh_server = SshServer({'root': cls._TEST_RSA_KEY_PATH})
cls.ssh_server.__enter__()

@classmethod
def tearDownClass(cls):
try:
cls.ssh_server.__exit__()
except OSError:
pass
83 changes: 4 additions & 79 deletions openwisp_controller/connection/tests/test_models.py
Original file line number Diff line number Diff line change
@@ -1,89 +1,14 @@
import os

import paramiko
from django.conf import settings
from django.core.exceptions import ValidationError
from django.test import TestCase
from mockssh import Server as SshServer
from openwisp_controller.config.models import Config, Device
from openwisp_controller.config.tests import CreateConfigTemplateMixin

from openwisp_users.tests.utils import TestOrganizationMixin

from .. import settings as app_settings
from ..models import Credentials, DeviceConnection, DeviceIp
from ..models import Credentials, DeviceIp
from ..utils import get_interfaces
from .base import CreateConnectionsMixin, SshServerMixin


class TestConnectionMixin(CreateConfigTemplateMixin, TestOrganizationMixin):
device_model = Device
config_model = Config
_TEST_RSA_KEY_PATH = os.path.join(settings.BASE_DIR, 'test-key.rsa')
with open(_TEST_RSA_KEY_PATH, 'r') as f:
_SSH_PRIVATE_KEY = f.read()

@classmethod
def setUpClass(cls):
cls.ssh_server = SshServer({'root': cls._TEST_RSA_KEY_PATH})
cls.ssh_server.__enter__()

@classmethod
def tearDownClass(cls):
try:
cls.ssh_server.__exit__()
except OSError:
pass

def _create_credentials(self, **kwargs):
opts = dict(name='Test credentials',
connector=app_settings.CONNECTORS[0][0],
params={'username': 'root',
'password': 'password',
'port': 22})
opts.update(kwargs)
if 'organization' not in opts:
opts['organization'] = self._create_org()
c = Credentials(**opts)
c.full_clean()
c.save()
return c

def _create_credentials_with_key(self, username='root', port=22, **kwargs):
opts = dict(name='Test SSH Key',
params={'username': username,
'key': self._SSH_PRIVATE_KEY,
'port': port})
return self._create_credentials(**opts)

def _create_device_connection(self, **kwargs):
opts = dict(enabled=True,
params={})
opts.update(kwargs)
if 'credentials' not in opts:
opts['credentials'] = self._create_credentials()
org = opts['credentials'].organization
if 'device' not in opts:
opts['device'] = self._create_device(organization=org)
self._create_config(device=opts['device'])
dc = DeviceConnection(**opts)
dc.full_clean()
dc.save()
return dc

def _create_device_ip(self, **kwargs):
opts = dict(address='10.40.0.1',
priority=1)
opts.update(kwargs)
if 'device' not in opts:
dc = self._create_device_connection()
opts['device'] = dc.device
ip = DeviceIp(**opts)
ip.full_clean()
ip.save()
return ip


class TestModels(TestConnectionMixin, TestCase):
class TestModels(SshServerMixin, CreateConnectionsMixin, TestCase):
def test_connection_str(self):
c = Credentials(name='Dev Key', connector=app_settings.CONNECTORS[0][0])
self.assertIn(c.name, str(c))
Expand Down Expand Up @@ -127,7 +52,7 @@ def test_device_connection_auto_update_strategy_missing_config(self):
try:
self._create_device_connection(device=device)
except ValidationError as e:
self.assertIn('inferred from', str(e))
self.assertIn('inferred from', str(e))
else:
self.fail('ValidationError not raised')

Expand Down

0 comments on commit b055fc6

Please sign in to comment.