Surece: https://idk.dev/python-code-to-download-dms-task-logs-using-the-aws-dms-task-id/
Slightly modified script still need to fix the datetime issue and adat for AWS Lambda.
import boto3, json, sys, time
from os import environ
import datetime
def start_time_milliseconds_since_epoch(): #time_string):
ts = int(time.time()*1000) - 8640000
print('Start time', ts)
return(ts)
def end_time_milliseconds_since_epoch():
ts = int(time.time()*1000)
print('End time ', ts)
return(ts)
def get_replication_tasks():
client = boto3.client('dms')
response = client.describe_replication_tasks(Filters=[
{
'Name': 'replication-task-id',
'Values': [
replication_task_id,
]
},
],
MaxRecords=100,
Marker='')
return response['ReplicationTasks']
def get_replication_instance_arn():
for ReplicationTasks in get_replication_tasks():
ReplicationInstanceArn = ReplicationTasks['ReplicationInstanceArn']
return ReplicationInstanceArn
def get_replication_instances():
client = boto3.client('dms')
response = client.describe_replication_instances(Filters=[
{
'Name': 'rep-instance-arn',
'Values': [
rep_instance_arn,
]
},
],
MaxRecords=100,
Marker='')
return response['ReplicationInstances']
def get_replication_instance_id():
for ReplicationInstances in get_replication_instances():
ReplicationInstanceIdentifier = ReplicationInstances['ReplicationInstanceIdentifier']
return ReplicationInstanceIdentifier
def get_cloudwatch_log_events(log_group):
client = boto3.client('logs')
kwargs = {
'logGroupName': log_group,
'limit': 1000,
'startTime': start_time,
'endTime': end_time
}
while True:
response = client.filter_log_events(**kwargs)
yield from response['events']
try:
kwargs['nextToken'] = response['nextToken']
except KeyError:
break
replication_task_id = 'celonis-dms-s3publish-part1'
rep_instance_arn = get_replication_instance_arn()
start_time = 1596672000000 #start_time_milliseconds_since_epoch()
end_time = 1596758400000 #end_time_milliseconds_since_epoch()
log_group = "dms-tasks-" + get_replication_instance_id()
# print(end_time - start_time)
def main():
for event in get_cloudwatch_log_events(log_group):
sys.stdout.write(event['message'].rstrip() + 'n')
if __name__ == '__main__':
main()