Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 18.116.47.194
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /usr/lib/python3/dist-packages/ansible_collections/awx/awx/test/awx/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /usr/lib/python3/dist-packages/ansible_collections/awx/awx/test/awx/test_job_template.py
from __future__ import absolute_import, division, print_function

__metaclass__ = type

import pytest

from awx.main.models import ActivityStream, JobTemplate, Job, NotificationTemplate


@pytest.mark.django_db
def test_create_job_template(run_module, admin_user, project, inventory):

    module_args = {
        'name': 'foo',
        'playbook': 'helloworld.yml',
        'project': project.name,
        'inventory': inventory.name,
        'extra_vars': {'foo': 'bar'},
        'job_type': 'run',
        'state': 'present',
    }

    result = run_module('job_template', module_args, admin_user)

    jt = JobTemplate.objects.get(name='foo')
    assert jt.extra_vars == '{"foo": "bar"}'

    assert result == {"name": "foo", "id": jt.id, "changed": True, "invocation": {"module_args": module_args}}

    assert jt.project_id == project.id
    assert jt.inventory_id == inventory.id


@pytest.mark.django_db
def test_resets_job_template_values(run_module, admin_user, project, inventory):

    module_args = {
        'name': 'foo',
        'playbook': 'helloworld.yml',
        'project': project.name,
        'inventory': inventory.name,
        'extra_vars': {'foo': 'bar'},
        'job_type': 'run',
        'state': 'present',
        'forks': 20,
        'timeout': 50,
        'allow_simultaneous': True,
        'ask_limit_on_launch': True,
        'ask_execution_environment_on_launch': True,
        'ask_forks_on_launch': True,
        'ask_instance_groups_on_launch': True,
        'ask_job_slice_count_on_launch': True,
        'ask_labels_on_launch': True,
        'ask_timeout_on_launch': True,
    }

    result = run_module('job_template', module_args, admin_user)

    jt = JobTemplate.objects.get(name='foo')
    assert jt.forks == 20
    assert jt.timeout == 50
    assert jt.allow_simultaneous
    assert jt.ask_limit_on_launch
    assert jt.ask_execution_environment_on_launch
    assert jt.ask_forks_on_launch
    assert jt.ask_instance_groups_on_launch
    assert jt.ask_job_slice_count_on_launch
    assert jt.ask_labels_on_launch
    assert jt.ask_timeout_on_launch

    module_args = {
        'name': 'foo',
        'playbook': 'helloworld.yml',
        'project': project.name,
        'inventory': inventory.name,
        'extra_vars': {'foo': 'bar'},
        'job_type': 'run',
        'state': 'present',
        'forks': 0,
        'timeout': 0,
        'allow_simultaneous': False,
        'ask_limit_on_launch': False,
        'ask_execution_environment_on_launch': False,
        'ask_forks_on_launch': False,
        'ask_instance_groups_on_launch': False,
        'ask_job_slice_count_on_launch': False,
        'ask_labels_on_launch': False,
        'ask_timeout_on_launch': False,
    }

    result = run_module('job_template', module_args, admin_user)
    assert result['changed']

    jt = JobTemplate.objects.get(name='foo')
    assert jt.forks == 0
    assert jt.timeout == 0
    assert not jt.allow_simultaneous
    assert not jt.ask_limit_on_launch
    assert not jt.ask_execution_environment_on_launch
    assert not jt.ask_forks_on_launch
    assert not jt.ask_instance_groups_on_launch
    assert not jt.ask_job_slice_count_on_launch
    assert not jt.ask_labels_on_launch
    assert not jt.ask_timeout_on_launch


@pytest.mark.django_db
def test_job_launch_with_prompting(run_module, admin_user, project, organization, inventory, machine_credential):
    JobTemplate.objects.create(
        name='foo',
        project=project,
        organization=organization,
        playbook='helloworld.yml',
        ask_variables_on_launch=True,
        ask_inventory_on_launch=True,
        ask_credential_on_launch=True,
    )
    result = run_module(
        'job_launch',
        dict(
            job_template='foo',
            inventory=inventory.name,
            credential=machine_credential.name,
            extra_vars={"var1": "My First Variable", "var2": "My Second Variable", "var3": "My Third Variable"},
        ),
        admin_user,
    )
    assert result.pop('changed', None), result

    job = Job.objects.get(id=result['id'])
    assert job.extra_vars == '{"var1": "My First Variable", "var2": "My Second Variable", "var3": "My Third Variable"}'
    assert job.inventory == inventory
    assert [cred.id for cred in job.credentials.all()] == [machine_credential.id]


@pytest.mark.django_db
def test_job_template_with_new_credentials(run_module, admin_user, project, inventory, machine_credential, vault_credential):
    result = run_module(
        'job_template',
        dict(
            name='foo', playbook='helloworld.yml', project=project.name, inventory=inventory.name, credentials=[machine_credential.name, vault_credential.name]
        ),
        admin_user,
    )
    assert not result.get('failed', False), result.get('msg', result)
    assert result.get('changed', False), result
    jt = JobTemplate.objects.get(pk=result['id'])

    assert set([machine_credential.id, vault_credential.id]) == set([cred.pk for cred in jt.credentials.all()])

    prior_ct = ActivityStream.objects.count()
    result = run_module(
        'job_template',
        dict(
            name='foo', playbook='helloworld.yml', project=project.name, inventory=inventory.name, credentials=[machine_credential.name, vault_credential.name]
        ),
        admin_user,
    )
    assert not result.get('failed', False), result.get('msg', result)
    assert not result.get('changed', True), result
    jt.refresh_from_db()
    assert result['id'] == jt.id

    assert set([machine_credential.id, vault_credential.id]) == set([cred.pk for cred in jt.credentials.all()])
    assert ActivityStream.objects.count() == prior_ct


@pytest.mark.django_db
def test_job_template_with_survey_spec(run_module, admin_user, project, inventory, survey_spec):
    result = run_module(
        'job_template',
        dict(name='foo', playbook='helloworld.yml', project=project.name, inventory=inventory.name, survey_spec=survey_spec, survey_enabled=True),
        admin_user,
    )
    assert not result.get('failed', False), result.get('msg', result)
    assert result.get('changed', False), result
    jt = JobTemplate.objects.get(pk=result['id'])

    assert jt.survey_spec == survey_spec

    prior_ct = ActivityStream.objects.count()
    result = run_module(
        'job_template',
        dict(name='foo', playbook='helloworld.yml', project=project.name, inventory=inventory.name, survey_spec=survey_spec, survey_enabled=True),
        admin_user,
    )
    assert not result.get('failed', False), result.get('msg', result)
    assert not result.get('changed', True), result
    jt.refresh_from_db()
    assert result['id'] == jt.id

    assert jt.survey_spec == survey_spec
    assert ActivityStream.objects.count() == prior_ct


@pytest.mark.django_db
def test_job_template_with_wrong_survey_spec(run_module, admin_user, project, inventory, survey_spec):
    result = run_module(
        'job_template',
        dict(name='foo', playbook='helloworld.yml', project=project.name, inventory=inventory.name, survey_spec=survey_spec, survey_enabled=True),
        admin_user,
    )
    assert not result.get('failed', False), result.get('msg', result)
    assert result.get('changed', False), result
    jt = JobTemplate.objects.get(pk=result['id'])

    assert jt.survey_spec == survey_spec

    prior_ct = ActivityStream.objects.count()

    del survey_spec['description']

    result = run_module(
        'job_template',
        dict(name='foo', playbook='helloworld.yml', project=project.name, inventory=inventory.name, survey_spec=survey_spec, survey_enabled=True),
        admin_user,
    )
    assert result.get('failed', True)
    assert result.get('msg') == "Failed to update survey: Field 'description' is missing from survey spec."

    assert ActivityStream.objects.count() == prior_ct


@pytest.mark.django_db
def test_job_template_with_survey_encrypted_default(run_module, admin_user, project, inventory, silence_warning):
    spec = {
        "spec": [{"index": 0, "question_name": "my question?", "default": "very_secret_value", "variable": "myvar", "type": "password", "required": False}],
        "description": "test",
        "name": "test",
    }
    for i in range(2):
        result = run_module(
            'job_template',
            dict(name='foo', playbook='helloworld.yml', project=project.name, inventory=inventory.name, survey_spec=spec, survey_enabled=True),
            admin_user,
        )
        assert not result.get('failed', False), result.get('msg', result)

    assert result.get('changed', False), result  # not actually desired, but assert for sanity

    silence_warning.assert_called_once_with(
        "The field survey_spec of job_template {0} has encrypted data and " "may inaccurately report task is changed.".format(result['id'])
    )


@pytest.mark.django_db
def test_associate_only_on_success(run_module, admin_user, organization, project):
    jt = JobTemplate.objects.create(
        name='foo',
        project=project,
        playbook='helloworld.yml',
        ask_inventory_on_launch=True,
    )
    create_kwargs = dict(
        notification_configuration={'url': 'http://www.example.com/hook', 'headers': {'X-Custom-Header': 'value123'}, 'password': 'bar'},
        notification_type='webhook',
        organization=organization,
    )
    nt1 = NotificationTemplate.objects.create(name='nt1', **create_kwargs)
    nt2 = NotificationTemplate.objects.create(name='nt2', **create_kwargs)

    jt.notification_templates_error.add(nt1)

    # test preservation of error NTs when success NTs are added
    result = run_module('job_template', dict(name='foo', playbook='helloworld.yml', project=project.name, notification_templates_success=['nt2']), admin_user)
    assert not result.get('failed', False), result.get('msg', result)
    assert result.get('changed', True), result

    assert list(jt.notification_templates_success.values_list('id', flat=True)) == [nt2.id]
    assert list(jt.notification_templates_error.values_list('id', flat=True)) == [nt1.id]

    # test removal to empty list
    result = run_module('job_template', dict(name='foo', playbook='helloworld.yml', project=project.name, notification_templates_success=[]), admin_user)
    assert not result.get('failed', False), result.get('msg', result)
    assert result.get('changed', True), result

    assert list(jt.notification_templates_success.values_list('id', flat=True)) == []
    assert list(jt.notification_templates_error.values_list('id', flat=True)) == [nt1.id]

Anon7 - 2022
AnonSec Team