Python code to download DMS Task Logs using the AWS DMS Task ID

Surece: https://idk.dev/python-code-to-download-dms-task-logs-using-the-aws-dms-task-id/

Slightly modified script still need to fix the datetime issue and adat for AWS Lambda.

import boto3, json, sys, time
from os import environ
import datetime

def start_time_milliseconds_since_epoch():  #time_string):
    ts = int(time.time()*1000) - 8640000
    print('Start time', ts)
    return(ts)


def end_time_milliseconds_since_epoch():
    ts = int(time.time()*1000)
    print('End time  ', ts)
    return(ts)


def get_replication_tasks():
    client = boto3.client('dms')

    response = client.describe_replication_tasks(Filters=[
        {
            'Name': 'replication-task-id',
            'Values': [
                replication_task_id,
            ]
        },
    ],
    MaxRecords=100,
    Marker='')

    return response['ReplicationTasks']

def get_replication_instance_arn():
    for ReplicationTasks in get_replication_tasks():
        ReplicationInstanceArn = ReplicationTasks['ReplicationInstanceArn']

        return ReplicationInstanceArn

def get_replication_instances():
    client = boto3.client('dms')    
    response = client.describe_replication_instances(Filters=[
        {
            'Name': 'rep-instance-arn',
            'Values': [
                rep_instance_arn,
            ]
        },
    ],
    MaxRecords=100,
    Marker='')
    
    
    return response['ReplicationInstances']

def get_replication_instance_id():
    for ReplicationInstances in get_replication_instances():
     ReplicationInstanceIdentifier = ReplicationInstances['ReplicationInstanceIdentifier']

     return ReplicationInstanceIdentifier

def get_cloudwatch_log_events(log_group):
    
    client = boto3.client('logs')
    kwargs = {
        'logGroupName': log_group,
        'limit': 1000,
        'startTime': start_time,
        'endTime': end_time
    }
    while True:
        response = client.filter_log_events(**kwargs)
        yield from response['events']
        try:
            kwargs['nextToken'] = response['nextToken']
        except KeyError:
            break

replication_task_id = 'celonis-dms-s3publish-part1'
rep_instance_arn = get_replication_instance_arn()

start_time = 1596672000000  #start_time_milliseconds_since_epoch()
end_time = 1596758400000  #end_time_milliseconds_since_epoch()

log_group = "dms-tasks-" + get_replication_instance_id()

# print(end_time - start_time)

def main():
    for event in get_cloudwatch_log_events(log_group):
        sys.stdout.write(event['message'].rstrip() + 'n')

if __name__ == '__main__':
    main()

Leave a Reply

Your email address will not be published. Required fields are marked *