Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 3.12.152.100
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /lib/python3/dist-packages/ansible_collections/community/aws/plugins/modules/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /lib/python3/dist-packages/ansible_collections/community/aws/plugins/modules//msk_cluster.py
#!/usr/bin/python
# Copyright: (c) 2021, Daniil Kupchenko (@oukooveu)
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)

from __future__ import absolute_import, division, print_function

__metaclass__ = type


DOCUMENTATION = r"""
---
module: msk_cluster
short_description: Manage Amazon MSK clusters
version_added: "2.0.0"
description:
    - Create, delete and modify Amazon MSK (Managed Streaming for Apache Kafka) clusters.
    - Prior to release 5.0.0 this module was called C(community.aws.aws_msk_cluster).
      The usage did not change.
author:
    - Daniil Kupchenko (@oukooveu)
options:
    state:
        description: Create (C(present)) or delete (C(absent)) cluster.
        choices: ['present', 'absent']
        type: str
        default: 'present'
    name:
        description: The name of the cluster.
        required: true
        type: str
    version:
        description:
            - The version of Apache Kafka.
            - This version should exist in given configuration.
            - This parameter is required when I(state=present).
        type: str
    configuration_arn:
        description:
            - ARN of the configuration to use.
            - This parameter is required when I(state=present).
        type: str
    configuration_revision:
        description:
            - The revision of the configuration to use.
            - This parameter is required when I(state=present).
        type: int
    nodes:
        description: The number of broker nodes in the cluster. Should be greater or equal to two.
        type: int
        default: 3
    instance_type:
        description:
            - The type of Amazon EC2 instances to use for Kafka brokers.
        choices:
            - kafka.t3.small
            - kafka.m5.large
            - kafka.m5.xlarge
            - kafka.m5.2xlarge
            - kafka.m5.4xlarge
        default: kafka.t3.small
        type: str
    ebs_volume_size:
        description: The size in GiB of the EBS volume for the data drive on each broker node.
        type: int
        default: 100
    subnets:
        description:
            - The list of subnets to connect to in the client virtual private cloud (VPC).
              AWS creates elastic network interfaces inside these subnets. Client applications use
              elastic network interfaces to produce and consume data.
            - Client subnets can't be in Availability Zone us-east-1e.
            - This parameter is required when I(state=present).
        type: list
        elements: str
    security_groups:
        description:
            - The AWS security groups to associate with the elastic network interfaces in order to specify
              who can connect to and communicate with the Amazon MSK cluster.
              If you don't specify a security group, Amazon MSK uses the default security group associated with the VPC.
        type: list
        elements: str
    encryption:
        description:
            - Includes all encryption-related information.
            - Effective only for new cluster and can not be updated.
        type: dict
        suboptions:
            kms_key_id:
                description:
                    - The ARN of the AWS KMS key for encrypting data at rest. If you don't specify a KMS key, MSK creates one for you and uses it.
                default: Null
                type: str
            in_transit:
                description: The details for encryption in transit.
                type: dict
                suboptions:
                    in_cluster:
                        description:
                            - When set to true, it indicates that data communication among the broker nodes of the cluster is encrypted.
                              When set to false, the communication happens in plaintext.
                        type: bool
                        default: True
                    client_broker:
                        description:
                            - Indicates the encryption setting for data in transit between clients and brokers. The following are the possible values.
                              TLS means that client-broker communication is enabled with TLS only.
                              TLS_PLAINTEXT means that client-broker communication is enabled for both TLS-encrypted, as well as plaintext data.
                              PLAINTEXT means that client-broker communication is enabled in plaintext only.
                        choices:
                            - TLS
                            - TLS_PLAINTEXT
                            - PLAINTEXT
                        type: str
                        default: TLS
    authentication:
        description:
            - Includes all client authentication related information.
            - Effective only for new cluster and can not be updated.
        type: dict
        suboptions:
            tls_ca_arn:
                description: List of ACM Certificate Authority ARNs.
                type: list
                elements: str
            sasl_scram:
                description: SASL/SCRAM authentication is enabled or not.
                type: bool
            sasl_iam:
                version_added: 5.5.0
                description: IAM authentication is enabled or not.
                type: bool
            unauthenticated:
                version_added: 5.5.0
                description: Option to explicitly turn on or off authentication
                type: bool
                default: True
    enhanced_monitoring:
        description: Specifies the level of monitoring for the MSK cluster.
        choices:
            - DEFAULT
            - PER_BROKER
            - PER_TOPIC_PER_BROKER
            - PER_TOPIC_PER_PARTITION
        default: DEFAULT
        type: str
    open_monitoring:
        description: The settings for open monitoring.
        type: dict
        suboptions:
            jmx_exporter:
                description: Indicates whether you want to enable or disable the JMX Exporter.
                type: bool
                default: False
            node_exporter:
                description: Indicates whether you want to enable or disable the Node Exporter.
                type: bool
                default: False
    logging:
        description: Logging configuration.
        type: dict
        suboptions:
            cloudwatch:
                description: Details of the CloudWatch Logs destination for broker logs.
                type: dict
                suboptions:
                    enabled:
                        description: Specifies whether broker logs get sent to the specified CloudWatch Logs destination.
                        type: bool
                        default: False
                    log_group:
                        description: The CloudWatch log group that is the destination for broker logs.
                        type: str
                        required: False
            firehose:
                description: Details of the Kinesis Data Firehose delivery stream that is the destination for broker logs.
                type: dict
                suboptions:
                    enabled:
                        description: Specifies whether broker logs get send to the specified Kinesis Data Firehose delivery stream.
                        type: bool
                        default: False
                    delivery_stream:
                        description: The Kinesis Data Firehose delivery stream that is the destination for broker logs.
                        type: str
                        required: False
            s3:
                description: Details of the Amazon S3 destination for broker logs.
                type: dict
                suboptions:
                    enabled:
                        description: Specifies whether broker logs get sent to the specified Amazon S3 destination.
                        type: bool
                        default: False
                    bucket:
                        description: The name of the S3 bucket that is the destination for broker logs.
                        type: str
                        required: False
                    prefix:
                        description: The S3 prefix that is the destination for broker logs.
                        type: str
                        required: False
    wait:
        description: Whether to wait for the cluster to be available or deleted.
        type: bool
        default: false
    wait_timeout:
        description: How many seconds to wait. Cluster creation can take up to 20-30 minutes.
        type: int
        default: 3600
extends_documentation_fragment:
    - amazon.aws.aws
    - amazon.aws.ec2
    - amazon.aws.boto3
    - amazon.aws.tags
notes:
    - All operations are time consuming, for example create takes 20-30 minutes,
      update kafka version -- more than one hour, update configuration -- 10-15 minutes;
    - Cluster's brokers get evenly distributed over a number of availability zones
      that's equal to the number of subnets.
"""

EXAMPLES = r"""
# Note: These examples do not set authentication details, see the AWS Guide for details.

- community.aws.msk_cluster:
    name: kafka-cluster
    state: present
    version: 2.6.1
    nodes: 6
    ebs_volume_size: "{{ aws_msk_options.ebs_volume_size }}"
    subnets:
      - subnet-e3b48ce7c25861eeb
      - subnet-2990c8b25b07ddd43
      - subnet-d9fbeaf46c54bfab6
    wait: true
    wait_timeout: 1800
    configuration_arn: arn:aws:kafka:us-east-1:123456789012:configuration/kafka-cluster-configuration/aaaaaaaa-bbbb-4444-3333-ccccccccc-1
    configuration_revision: 1

- community.aws.msk_cluster:
    name: kafka-cluster
    state: absent
"""

RETURN = r"""
# These are examples of possible return values, and in general should use other names for return values.

bootstrap_broker_string:
    description: A list of brokers that a client application can use to bootstrap.
    type: complex
    contains:
        plain:
            description: A string containing one or more hostname:port pairs.
            type: str
        tls:
            description: A string containing one or more DNS names (or IP) and TLS port pairs.
            type: str
    returned: I(state=present) and cluster state is I(ACTIVE)
cluster_info:
    description: Description of the MSK cluster.
    type: dict
    returned: I(state=present)
response:
    description: The response from actual API call.
    type: dict
    returned: always
    sample: {}
"""

import time

try:
    import botocore
except ImportError:
    pass  # handled by AnsibleAWSModule

from ansible_collections.amazon.aws.plugins.module_utils.core import AnsibleAWSModule
from ansible_collections.amazon.aws.plugins.module_utils.ec2 import (
    camel_dict_to_snake_dict,
    compare_aws_tags,
    AWSRetry,
)


@AWSRetry.jittered_backoff(retries=5, delay=5)
def list_clusters_with_backoff(client, cluster_name):
    paginator = client.get_paginator("list_clusters")
    return paginator.paginate(ClusterNameFilter=cluster_name).build_full_result()


@AWSRetry.jittered_backoff(retries=5, delay=5)
def list_nodes_with_backoff(client, cluster_arn):
    paginator = client.get_paginator("list_nodes")
    return paginator.paginate(ClusterArn=cluster_arn).build_full_result()


def find_cluster_by_name(client, module, cluster_name):
    try:
        cluster_list = list_clusters_with_backoff(client, cluster_name).get("ClusterInfoList", [])
    except (
        botocore.exceptions.BotoCoreError,
        botocore.exceptions.ClientError,
    ) as e:
        module.fail_json_aws(e, "Failed to find kafka cluster by name")
    if cluster_list:
        if len(cluster_list) != 1:
            module.fail_json(msg="Found more than one cluster with name '{0}'".format(cluster_name))
        return cluster_list[0]
    return {}


def get_cluster_state(client, module, arn):
    try:
        response = client.describe_cluster(ClusterArn=arn, aws_retry=True)
    except client.exceptions.NotFoundException:
        return "DELETED"
    except (
        botocore.exceptions.BotoCoreError,
        botocore.exceptions.ClientError,
    ) as e:
        module.fail_json_aws(e, "Failed to get kafka cluster state")
    return response["ClusterInfo"]["State"]


def get_cluster_version(client, module, arn):
    try:
        response = client.describe_cluster(ClusterArn=arn, aws_retry=True)
    except (
        botocore.exceptions.BotoCoreError,
        botocore.exceptions.ClientError,
    ) as e:
        module.fail_json_aws(e, "Failed to get kafka cluster version")
    return response["ClusterInfo"]["CurrentVersion"]


def wait_for_cluster_state(client, module, arn, state="ACTIVE"):
    # As of 2021-06 boto3 doesn't offer any built in waiters
    start = time.time()
    timeout = int(module.params.get("wait_timeout"))
    check_interval = 60
    while True:
        current_state = get_cluster_state(client, module, arn)
        if current_state == state:
            return
        if time.time() - start > timeout:
            module.fail_json(
                msg="Timeout waiting for cluster {0} (desired state is '{1}')".format(
                    current_state, state
                )
            )
        time.sleep(check_interval)


def prepare_create_options(module):
    """
    Return data structure for cluster create operation
    """

    c_params = {
        "ClusterName": module.params["name"],
        "KafkaVersion": module.params["version"],
        "ConfigurationInfo": {
            "Arn": module.params["configuration_arn"],
            "Revision": module.params["configuration_revision"],
        },
        "NumberOfBrokerNodes": module.params["nodes"],
        "BrokerNodeGroupInfo": {
            "ClientSubnets": module.params["subnets"],
            "InstanceType": module.params["instance_type"],
        }
    }

    if module.params["security_groups"] and len(module.params["security_groups"]) != 0:
        c_params["BrokerNodeGroupInfo"]["SecurityGroups"] = module.params.get("security_groups")

    if module.params["ebs_volume_size"]:
        c_params["BrokerNodeGroupInfo"]["StorageInfo"] = {
            "EbsStorageInfo": {
                "VolumeSize": module.params.get("ebs_volume_size")
            }
        }

    if module.params["encryption"]:
        c_params["EncryptionInfo"] = {}
        if module.params["encryption"].get("kms_key_id"):
            c_params["EncryptionInfo"]["EncryptionAtRest"] = {
                "DataVolumeKMSKeyId": module.params["encryption"]["kms_key_id"]
            }
        c_params["EncryptionInfo"]["EncryptionInTransit"] = {
            "ClientBroker": module.params["encryption"]["in_transit"].get("client_broker", "TLS"),
            "InCluster": module.params["encryption"]["in_transit"].get("in_cluster", True)
        }

    if module.params["authentication"]:
        c_params["ClientAuthentication"] = {}
        if module.params["authentication"].get("sasl_scram") or module.params["authentication"].get("sasl_iam"):
            sasl = {}
            if module.params["authentication"].get("sasl_scram"):
                sasl["Scram"] = {"Enabled": True}
            if module.params["authentication"].get("sasl_iam"):
                sasl["Iam"] = {"Enabled": True}
            c_params["ClientAuthentication"]["Sasl"] = sasl
        if module.params["authentication"].get("tls_ca_arn"):
            c_params["ClientAuthentication"]["Tls"] = {
                "CertificateAuthorityArnList": module.params["authentication"]["tls_ca_arn"],
                "Enabled": True,
            }
        if module.params["authentication"].get("unauthenticated"):
            c_params["ClientAuthentication"] = {
                "Unauthenticated": {"Enabled": True},
            }

    c_params.update(prepare_enhanced_monitoring_options(module))
    c_params.update(prepare_open_monitoring_options(module))
    c_params.update(prepare_logging_options(module))

    return c_params


def prepare_enhanced_monitoring_options(module):
    m_params = {}
    m_params["EnhancedMonitoring"] = module.params["enhanced_monitoring"] or "DEFAULT"
    return m_params


def prepare_open_monitoring_options(module):
    m_params = {}
    open_monitoring = module.params["open_monitoring"] or {}
    m_params["OpenMonitoring"] = {
        "Prometheus": {
            "JmxExporter": {
                "EnabledInBroker": open_monitoring.get("jmx_exporter", False)
            },
            "NodeExporter": {
                "EnabledInBroker": open_monitoring.get("node_exporter", False)
            }
        }
    }
    return m_params


def prepare_logging_options(module):
    l_params = {}
    logging = module.params["logging"] or {}
    if logging.get("cloudwatch"):
        l_params["CloudWatchLogs"] = {
            "Enabled": module.params["logging"]["cloudwatch"].get("enabled"),
            "LogGroup": module.params["logging"]["cloudwatch"].get("log_group")
        }
    else:
        l_params["CloudWatchLogs"] = {
            "Enabled": False
        }
    if logging.get("firehose"):
        l_params["Firehose"] = {
            "Enabled": module.params["logging"]["firehose"].get("enabled"),
            "DeliveryStream": module.params["logging"]["firehose"].get("delivery_stream")
        }
    else:
        l_params["Firehose"] = {
            "Enabled": False
        }
    if logging.get("s3"):
        l_params["S3"] = {
            "Enabled": module.params["logging"]["s3"].get("enabled"),
            "Bucket": module.params["logging"]["s3"].get("bucket"),
            "Prefix": module.params["logging"]["s3"].get("prefix")
        }
    else:
        l_params["S3"] = {
            "Enabled": False
        }
    return {
        "LoggingInfo": {
            "BrokerLogs": l_params
        }
    }


def create_or_update_cluster(client, module):
    """
    Create new or update existing cluster
    """

    changed = False
    response = {}

    cluster = find_cluster_by_name(client, module, module.params["name"])

    if not cluster:

        changed = True

        if module.check_mode:
            return True, {}

        create_params = prepare_create_options(module)

        try:
            response = client.create_cluster(aws_retry=True, **create_params)
        except (
            botocore.exceptions.BotoCoreError,
            botocore.exceptions.ClientError,
        ) as e:
            module.fail_json_aws(e, "Failed to create kafka cluster")

        if module.params.get("wait"):
            wait_for_cluster_state(client, module, arn=response["ClusterArn"], state="ACTIVE")

    else:

        response["ClusterArn"] = cluster["ClusterArn"]
        response["changes"] = {}

        # prepare available update methods definitions with current/target values and options
        msk_cluster_changes = {
            "broker_count": {
                "current_value": cluster["NumberOfBrokerNodes"],
                "target_value": module.params.get("nodes"),
                "update_params": {
                    "TargetNumberOfBrokerNodes": module.params.get("nodes")
                }
            },
            "broker_storage": {
                "current_value": cluster["BrokerNodeGroupInfo"]["StorageInfo"]["EbsStorageInfo"]["VolumeSize"],
                "target_value": module.params.get("ebs_volume_size"),
                "update_params": {
                    "TargetBrokerEBSVolumeInfo": [
                        {"KafkaBrokerNodeId": "All", "VolumeSizeGB": module.params.get("ebs_volume_size")}
                    ]
                }
            },
            "broker_type": {
                "current_value": cluster["BrokerNodeGroupInfo"]["InstanceType"],
                "target_value": module.params.get("instance_type"),
                "update_params": {
                    "TargetInstanceType": module.params.get("instance_type")
                }
            },
            "cluster_configuration": {
                "current_value": {
                    "arn": cluster["CurrentBrokerSoftwareInfo"]["ConfigurationArn"],
                    "revision": cluster["CurrentBrokerSoftwareInfo"]["ConfigurationRevision"],
                },
                "target_value": {
                    "arn": module.params.get("configuration_arn"),
                    "revision": module.params.get("configuration_revision"),
                },
                "update_params": {
                    "ConfigurationInfo": {
                        "Arn": module.params.get("configuration_arn"),
                        "Revision": module.params.get("configuration_revision")
                    }
                }
            },
            "cluster_kafka_version": {
                "current_value": cluster["CurrentBrokerSoftwareInfo"]["KafkaVersion"],
                "target_value": module.params.get("version"),
                "update_params": {
                    "TargetKafkaVersion": module.params.get("version")
                }
            },
            "enhanced_monitoring": {
                "current_value": cluster["EnhancedMonitoring"],
                "target_value": module.params.get("enhanced_monitoring"),
                "update_method": "update_monitoring",
                "update_params": prepare_enhanced_monitoring_options(module)
            },
            "open_monitoring": {
                "current_value": {
                    "OpenMonitoring": cluster["OpenMonitoring"]
                },
                "target_value": prepare_open_monitoring_options(module),
                "update_method": "update_monitoring",
                "update_params": prepare_open_monitoring_options(module)
            },
            "logging": {
                "current_value": {
                    "LoggingInfo": cluster["LoggingInfo"]
                },
                "target_value": prepare_logging_options(module),
                "update_method": "update_monitoring",
                "update_params": prepare_logging_options(module)
            }
        }

        for method, options in msk_cluster_changes.items():

            if 'botocore_version' in options:
                if not module.botocore_at_least(options["botocore_version"]):
                    continue

            try:
                update_method = getattr(client, options.get("update_method", "update_" + method))
            except AttributeError as e:
                module.fail_json_aws(e, "There is no update method 'update_{0}'".format(method))

            if options["current_value"] != options["target_value"]:
                changed = True
                if module.check_mode:
                    return True, {}

                # need to get cluster version and check for the state because
                # there can be several updates requested but only one in time can be performed
                version = get_cluster_version(client, module, cluster["ClusterArn"])
                state = get_cluster_state(client, module, cluster["ClusterArn"])
                if state != "ACTIVE":
                    if module.params["wait"]:
                        wait_for_cluster_state(client, module, arn=cluster["ClusterArn"], state="ACTIVE")
                    else:
                        module.fail_json(
                            msg="Cluster can be updated only in active state, current state is '{0}'. check cluster state or use wait option".format(
                                state
                            )
                        )
                try:
                    response["changes"][method] = update_method(
                        ClusterArn=cluster["ClusterArn"],
                        CurrentVersion=version,
                        **options["update_params"]
                    )
                except (
                    botocore.exceptions.BotoCoreError,
                    botocore.exceptions.ClientError,
                ) as e:
                    module.fail_json_aws(
                        e, "Failed to update cluster via 'update_{0}'".format(method)
                    )

                if module.params["wait"]:
                    wait_for_cluster_state(client, module, arn=cluster["ClusterArn"], state="ACTIVE")

    changed |= update_cluster_tags(client, module, response["ClusterArn"])

    return changed, response


def update_cluster_tags(client, module, arn):
    new_tags = module.params.get('tags')
    if new_tags is None:
        return False
    purge_tags = module.params.get('purge_tags')

    try:
        existing_tags = client.list_tags_for_resource(ResourceArn=arn, aws_retry=True)['Tags']
    except (botocore.exceptions.ClientError, botocore.exceptions.BotoCoreError) as e:
        module.fail_json_aws(e, msg="Unable to retrieve tags for cluster '{0}'".format(arn))

    tags_to_add, tags_to_remove = compare_aws_tags(existing_tags, new_tags, purge_tags=purge_tags)

    if not module.check_mode:
        try:
            if tags_to_remove:
                client.untag_resource(ResourceArn=arn, TagKeys=tags_to_remove, aws_retry=True)
            if tags_to_add:
                client.tag_resource(ResourceArn=arn, Tags=tags_to_add, aws_retry=True)
        except (botocore.exceptions.ClientError, botocore.exceptions.BotoCoreError) as e:
            module.fail_json_aws(e, msg="Unable to set tags for cluster '{0}'".format(arn))

    changed = bool(tags_to_add) or bool(tags_to_remove)
    return changed


def delete_cluster(client, module):

    cluster = find_cluster_by_name(client, module, module.params["name"])

    if module.check_mode:
        if cluster:
            return True, cluster
        else:
            return False, {}

    if not cluster:
        return False, {}

    try:
        response = client.delete_cluster(
            ClusterArn=cluster["ClusterArn"],
            CurrentVersion=cluster["CurrentVersion"],
        )
    except (botocore.exceptions.BotoCoreError, botocore.exceptions.ClientError) as e:
        module.fail_json_aws(e, "Failed to delete kafka cluster")

    if module.params["wait"]:
        wait_for_cluster_state(client, module, arn=cluster["ClusterArn"], state="DELETED")

    response["bootstrap_broker_string"] = {}

    return True, response


def main():

    module_args = dict(
        name=dict(type="str", required=True),
        state=dict(type="str", choices=["present", "absent"], default="present"),
        version=dict(type="str"),
        configuration_arn=dict(type="str"),
        configuration_revision=dict(type="int"),
        nodes=dict(type="int", default=3),
        instance_type=dict(
            choices=[
                "kafka.t3.small",
                "kafka.m5.large",
                "kafka.m5.xlarge",
                "kafka.m5.2xlarge",
                "kafka.m5.4xlarge",
            ],
            default="kafka.t3.small",
        ),
        ebs_volume_size=dict(type="int", default=100),
        subnets=dict(type="list", elements="str"),
        security_groups=dict(type="list", elements="str", required=False),
        encryption=dict(
            type="dict",
            options=dict(
                kms_key_id=dict(type="str", required=False),
                in_transit=dict(
                    type="dict",
                    options=dict(
                        in_cluster=dict(type="bool", default=True),
                        client_broker=dict(
                            choices=["TLS", "TLS_PLAINTEXT", "PLAINTEXT"],
                            default="TLS"
                        ),
                    ),
                ),
            ),
        ),
        authentication=dict(
            type="dict",
            options=dict(
                tls_ca_arn=dict(type="list", elements="str", required=False),
                sasl_scram=dict(type="bool", required=False),
                sasl_iam=dict(type="bool", required=False),
                unauthenticated=dict(type="bool", default=True, required=False),
            ),
        ),
        enhanced_monitoring=dict(
            choices=[
                "DEFAULT",
                "PER_BROKER",
                "PER_TOPIC_PER_BROKER",
                "PER_TOPIC_PER_PARTITION",
            ],
            default="DEFAULT",
            required=False,
        ),
        open_monitoring=dict(
            type="dict",
            options=dict(
                jmx_exporter=dict(type="bool", default=False),
                node_exporter=dict(type="bool", default=False),
            ),
        ),
        logging=dict(
            type="dict",
            options=dict(
                cloudwatch=dict(
                    type="dict",
                    options=dict(
                        enabled=dict(type="bool", default=False),
                        log_group=dict(type="str", required=False),
                    ),
                ),
                firehose=dict(
                    type="dict",
                    options=dict(
                        enabled=dict(type="bool", default=False),
                        delivery_stream=dict(type="str", required=False),
                    ),
                ),
                s3=dict(
                    type="dict",
                    options=dict(
                        enabled=dict(type="bool", default=False),
                        bucket=dict(type="str", required=False),
                        prefix=dict(type="str", required=False),
                    ),
                ),
            ),
        ),
        wait=dict(type="bool", default=False),
        wait_timeout=dict(type="int", default=3600),
        tags=dict(type='dict', aliases=['resource_tags']),
        purge_tags=dict(type='bool', default=True),
    )

    module = AnsibleAWSModule(
        argument_spec=module_args,
        required_if=[['state', 'present', ['version', 'configuration_arn', 'configuration_revision', 'subnets']]],
        supports_check_mode=True
    )

    client = module.client("kafka", retry_decorator=AWSRetry.jittered_backoff())

    if module.params["state"] == "present":
        if len(module.params["subnets"]) < 2:
            module.fail_json(
                msg="At least two client subnets should be provided"
            )
        if int(module.params["nodes"]) % int(len(module.params["subnets"])) != 0:
            module.fail_json(
                msg="The number of broker nodes must be a multiple of availability zones in the subnets parameter"
            )
        if len(module.params["name"]) > 64:
            module.fail_json(
                module.fail_json(msg='Cluster name "{0}" exceeds 64 character limit'.format(module.params["name"]))
            )
        changed, response = create_or_update_cluster(client, module)
    elif module.params["state"] == "absent":
        changed, response = delete_cluster(client, module)

    cluster_info = {}
    bootstrap_broker_string = {}
    if response.get("ClusterArn") and module.params["state"] == "present":
        try:
            cluster_info = client.describe_cluster(ClusterArn=response["ClusterArn"], aws_retry=True)[
                "ClusterInfo"
            ]
            if cluster_info.get("State") == "ACTIVE":
                brokers = client.get_bootstrap_brokers(ClusterArn=response["ClusterArn"], aws_retry=True)
                if brokers.get("BootstrapBrokerString"):
                    bootstrap_broker_string["plain"] = brokers["BootstrapBrokerString"]
                if brokers.get("BootstrapBrokerStringTls"):
                    bootstrap_broker_string["tls"] = brokers["BootstrapBrokerStringTls"]
        except (
            botocore.exceptions.BotoCoreError,
            botocore.exceptions.ClientError,
        ) as e:
            module.fail_json_aws(
                e,
                "Can not obtain information about cluster {0}".format(
                    response["ClusterArn"]
                ),
            )

    module.exit_json(
        changed=changed,
        bootstrap_broker_string=bootstrap_broker_string,
        cluster_info=camel_dict_to_snake_dict(cluster_info),
        response=camel_dict_to_snake_dict(response),
    )


if __name__ == "__main__":
    main()

Anon7 - 2022
AnonSec Team