import json import boto3 # Specify desired resource types to validate APPLICABLE_RESOURCES = ["AWS::EC2::Instance","AWS::S3::Bucket","AWS::Redshift::Cluster","AWS::EC2::Volume", "AWS::RDS::DBInstance"] #Get current user account id acct_id = sts_client.get_caller_identity()['Account'] # Iterate through required tags ensureing each required tag is present, # and value is one of the given valid values def find_violation(current_tags, required_tags): violation = "" violation_list = list() for rtag in required_tags: tag_present = False for tag in current_tags: if tag == rtag: value_match = False tag_present = True rvaluesplit = required_tags[rtag].split(",") for rvalue in rvaluesplit: if current_tags[tag] == required_tags[rtag]: value_match = True if current_tags[tag] != "": if rvalue == "*": value_match = True if value_match == False: violation = violation + "\n" + tag['value'] + " doesn't match any of " + required_tags[rtag] + "!" if not tag_present: violation_list.append(str(rtag)) if violation_list: violation = '%s tags are not present' %(', '.join(violation_list)) if violation == "": return None return violation def evaluate_compliance(configuration_item, rule_parameters): if configuration_item["resourceType"] not in APPLICABLE_RESOURCES: return { "compliance_type": "NOT_APPLICABLE", "annotation": "The rule doesn't apply to resources of type " + configuration_item["resourceType"] + "." } if configuration_item["configurationItemStatus"] == "ResourceDeleted": return { "compliance_type": "NOT_APPLICABLE", "annotation": "The configurationItem was deleted and therefore cannot be validated." } current_tags = configuration_item.get("tags") violation = find_violation(current_tags, rule_parameters) if violation: #send an email about the violtion and record in AWS config #change the email sender = 'lecadou@gmail.com' receiver = 'lecadou@gmail.com' ses_client = boto3.client('ses') ses_client.send_email(Destination={ 'ToAddresses': [ receiver, ], },Message={'Subject': { 'Data': 'AWS Config Violation- Billing'}, 'Body': { 'Text': { 'Data': 'Resource %s - %s' %(configuration_item['ARN'],violation)} } },Source=sender) return { "compliance_type": "NON_COMPLIANT", "annotation": violation } return { "compliance_type": "COMPLIANT", "annotation": "This resource is compliant with the rule." } def lambda_handler(event, context): print(json.dumps(event,indent=4)) invoking_event = json.loads(event["invokingEvent"]) configuration_item = invoking_event["configurationItem"] rule_parameters = json.loads(event["ruleParameters"]) result_token = "No token found." if "resultToken" in event: result_token = event["resultToken"] evaluation = evaluate_compliance(configuration_item, rule_parameters) config = boto3.client("config") config.put_evaluations( Evaluations=[ { "ComplianceResourceType": configuration_item["resourceType"], "ComplianceResourceId": configuration_item["resourceId"], "ComplianceType": evaluation["compliance_type"], "Annotation": evaluation["annotation"], "OrderingTimestamp": configuration_item["configurationItemCaptureTime"] }, ], ResultToken=result_token )
Run
Reset
Share
Import
Link
Embed
Language▼
English
中文
Python Fiddle
Python Cloud IDE
Follow @python_fiddle
Browser Version Not Supported
Due to Python Fiddle's reliance on advanced JavaScript techniques, older browsers might have problems running it correctly. Please download the latest version of your favourite browser.
Chrome 10+
Firefox 4+
Safari 5+
IE 10+
Let me try anyway!
url:
Go
Python Snippet
Stackoverflow Question