from sys import argv from subprocess import call import sys import time import string import os import shutil ## YYYY-MM-DD format currDt=time.strftime("%Y-%m-%d") #Execution def ExecuteFn(TDCHFILE,EXEC_FLAG): subprocess.Popen("nohup sh "+TDCHFILE+" >>"+TDCH_LOGDIR+"/"+TDCHFILE+'.log &', shell=True, stdout=subprocess.PIPE) #TDCH command creation when split type is AMP def SplitAmpFn(<All the parameters>): #check for the number of columns in WHR_COL and return the related query. cnt = len(WHR_COL) WHR_COL1=WHR_COL(0) WHR_COL2=WHR_COL(1) if cnt==2: return "hadoop jar "+TDCH_JAR+" com.teradata.connector.common.tool.ConnectorImportTool -classname com.teradata.jdbc.TeraDriver -nummappers "+MAPPERS+" -url jdbc:teradata://IDW/DATABASE="+DATABASE+",TMODE="TMODE+",LOGMECH="+LDAP+" -username "+HDTD_LOGIN+" -password "+HDTD_PASSWD+" -jobtype hdfs -fileformat textfile -method split.by."+SPLIT_TYPE+" -sourcetable "+SRCTBL_NM+" -sourceconditions '"+WHR_COL1+" > "+START_DT+" AND "+WHR_COL2+" <="+ END_DT+"' -separator '|' -targetpaths"+TGTTBL_NM else: return "hadoop jar "+TDCH_JAR+" com.teradata.connector.common.tool.ConnectorImportTool -classname com.teradata.jdbc.TeraDriver -nummappers "+MAPPERS+" -url jdbc:teradata://IDW/DATABASE="+DATABASE+",TMODE="TMODE+",LOGMECH="+LDAP+" -username "+HDTD_LOGIN+" -password "+HDTD_PASSWD+" -jobtype hdfs -fileformat textfile -method split.by."+SPLIT_TYPE+" -sourcetable "+SRCTBL_NM+" -sourceconditions '"+WHR_COL+" BETWEEN "+START_DT+" AND "+END_DT+"' -separator '|' -targetpaths"+TGTTBL_NM #TDCH command creation when split type is Hash/Value def SplitNonAmpFn(<All the parameters>): #check for the number of columns in WHR_COL and return the related query. cnt = len(WHR_COL) WHR_COL1=WHR_COL(0) WHR_COL2=WHR_COL(1) if cnt==2: return "hadoop jar "+TDCH_JAR+" com.teradata.connector.common.tool.ConnectorImportTool -classname com.teradata.jdbc.TeraDriver -nummappers "+MAPPERS+" -url jdbc:teradata://IDW/DATABASE="+DATABASE+",TMODE="TMODE+",LOGMECH="+LDAP+" -username "+HDTD_LOGIN+" -password "+HDTD_PASSWD+" -jobtype hdfs -fileformat textfile -method split.by."+SPLIT_TYPE+"-splitbycolumn "+SPLIT_COLUMN+" -sourcetable "+SRCTBL_NM+" -sourceconditions '"+WHR_COL1+" > "+START_DT+" AND "+WHR_COL2+" <="+ END_DT+"' -separator '|' -targetpaths"+TGTTBL_NM else: return "hadoop jar "+TDCH_JAR+" com.teradata.connector.common.tool.ConnectorImportTool -classname com.teradata.jdbc.TeraDriver -nummappers "+MAPPERS+" -url jdbc:teradata://IDW/DATABASE="+DATABASE+",TMODE="TMODE+",LOGMECH="+LDAP+" -username "+HDTD_LOGIN+" -password "+HDTD_PASSWD+" -jobtype hdfs -fileformat textfile -method split.by."+SPLIT_TYPE+"-splitbycolumn "+SPLIT_COLUMN+" -sourcetable "+SRCTBL_NM+" -sourceconditions '"+WHR_COL+" BETWEEN "+START_DT+" AND "+END_DT+"' -separator '|' -targetpaths"+TGTTBL_NM #TDCH command generate function, which will call above 2 functions. def TDCHGenFn(<All the parameters>): if SPLIT_TYPE='AMP': #Call Split by Amp function TDCHCommand = SplitAmpFn(<All the parameters>) elif SPLIT_TYPE!='AMP': #Call Split by hash/value function TDCHCommand = SplitNonAmpFn(<All the parameters>) TDCHFILE=TDCH_SCRIPTS_DIR+'/'+ESP_NM+'_'+DATABASE+'_'+TABLE+'.sh' tdch_write = open(TDCHFILE,'w') tdch_write.write("PID=`echo $$` \n echo '"+DB_Name+"|"+SRCTBL_NM+"|'"+"$PID >>"+PIDFILE) tdch_write.write(TDCHCommand) tdch_write.write("\n retcode=$?") tdch_write.write("if [[ ${retcode} != 0 ]]; then \n") tdch_write.write("printf 'Below TDCH command failed \n '"+TDCHCommand+" | mailx -s 'TDCH job failure notification' hanumantharao.valluri@compass.com,rajesh.thota@compass.com \n touch "+PIDDIR_TEMP+"/$PID'_FAL.txt' \n exit 1 \n else \n touch "+PIDDIR_TEMP+"/$PID'_SCS.txt' \n fi") return TDCHFILE def SCDLoadFn(<All the parameters>): #Call SCD next steps #Call Hive shell script to perform below steps. ''' 1. Read PIDFILE and get PID for the DATABASE and TABLE combination. 2. check for the availability of PID_SCS/PID_FAL files under PIDDIR_TEMP directory. 3. If PID_SCS is created, proceed with hive execution. 4. If PID_FAL is create, send a mail saying TDCH failed so, exiting without hive execution.. and exit ''' def FullLoadFn(<All the parameters>): #Call Full load next steps ''' 1. Read PIDFILE and get PID for the DATABASE and TABLE combination. 2. check for the availability of PID_SCS/PID_FAL files under PIDDIR_TEMP directory. 3. If PID_SCS is created, proceed with hive execution. 4. If PID_FAL is create, send a mail saying TDCH failed so, exiting without hive execution.. and exit ''' def AppendLoadFn(<All the parameters>): #Call Append next steps ''' 1. Read PIDFILE and get PID for the DATABASE and TABLE combination. 2. check for the availability of PID_SCS/PID_FAL files under PIDDIR_TEMP directory. 3. If PID_SCS is created, proceed with hive execution. 4. If PID_FAL is create, send a mail saying TDCH failed so, exiting without hive execution.. and exit ''' # Using Split_Flag, Start_Date and END_DT, it creates the individual records in DATE_RANGE_OUT file based on Split_Flag (Y/M) def DateRangeFn(): if Split_Flag=='Y': while(i < (int(End_Date[:4])+1)): MM_DD_Start_Date=Start_Date[5:] CH_Start_Date=(Start_Date[:4])+(MM_DD_Start_Date[:2])+(Start_Date[8:]) New_Start_Date=CH_Start_Date[:4]+'-'+MM_DD_Start_Date MM_DD_End_Date=End_Date[5:] CH_End_Date=(New_Start_Date[:4])+(MM_DD_End_Date[:2])+(End_Date[8:]) New_End_Date=CH_End_Date[:4]+'-'+MM_DD_End_Date result = DB_Name+'|'+TBL_Name+'|'+New_Start_Date+'|'+New_End_Date+'|'+Split_Flag+'|'+'N' DateFile_write = open(DATE_RANGE_OUT,'a+') DateFile_write.write(result) DateFile_write.write("\n") i=int(CH_Start_Date[:4]) i=i+1 j=str(i) Start_Date = j[:4]+Start_Date[4:] DateFile_write.close() if Split_Flag=='M': New_Start_Date=Start_Date New_End_Date=Start_Date while(New_End_Date < End_Date): MM_DD_Start_Date=New_Start_Date[5:] i=int(MM_DD_Start_Date[:2]) Year_string=int(New_Start_Date[:4]) New_End_Day=calendar.monthrange(Year_string,i)[1] New_End_Date=(New_Start_Date[:5])+(MM_DD_Start_Date[:3])+str(New_End_Day) result = DB_Name+'|'+TBL_Name+'|'+New_Start_Date+'|'+New_End_Date+'|'+Split_Flag+'|'+'N' DateFile_write = open(DATE_RANGE_OUT,'a') DateFile_write.write(result) DateFile_write.write("\n") i=i+1 j=str(i) if i < maxmonth: j=format(i, '02') New_End_Month=j New_Start_Year=New_Start_Date[:4] if i==13: New_Start_Year_int=int(New_Start_Date[:4])+1 New_Start_Year=str(New_Start_Year_int) New_End_Month='01' New_Start_Date=New_Start_Year+'-'+New_End_Month+New_Start_Date[7:] DateFile_write.close() #################### Diiferent Types of Load Function Methods For Date Range Functionality ############# def LoadTypeFn(DBNAME,TBL,STDT,ENDT,DRFRQ,DRUPDFLG): TDCHFILE= TDCHGenFn(<All the parameters>) ExecuteFn(TDCHFILE,'M') if LOAD_TYPE=='SCD': SCDLoadFn(<All the parameters>) elif LOAD_TYPE=='FULL': FullLoadFn(<All the parameters>) elif LOAD_TYPE=='APPEND': AppendLoadFn(<All the parameters>) #### Reading the records in DATE_RANGE_OUT and for every record,calling the LoadTypeFn method def DateRangeRead(): with open(DATE_RANGE_OUT,'r') as infile: for l in infile: line = l.split('|') DR_DB_NAME=line[0] DR_TBL_NAME=line[1] DR_ST_DT=line[2] DR_END_DT=line[3] DR_FRQ=line[4] DR_UPDT_FLAG=line[5] LoadTypeFn(DR_DB_NAME,DR_TBL_NAME,DR_ST_DT,DR_END_DT,DR_FRQ,DR_UPDT_FLAG) #Date Range record - RUN_FLAG updation to 'Y' in new file #Format#XYZ|ABC|2010-01-01|2010-12-31|M|N def DateRangeUPDT(): with open(DATE_RANGE_OUT,'r') as infile: for l in infile: line = l.split('|') DR_DB_NAME=line[0] DR_TBL_NAME=line[1] DR_ST_DT=line[2] DR_END_DT=line[3] DR_FRQ=line[4] DR_UPDT_FLAG=line[5] result = DR_DB_NAME+'|'+DR_TBL_NAME+'|'+DR_ST_DT+'|'+DR_END_DT+'|'+DR_FRQ+'|'+'Y' with open(DATE_RANGE_OUTUPDT, 'a') as output_file: print result1 output_file.write(result) output_file.write("\n") def DateRange_NotHistory(): with open(DATE_RANGE_DAY_FILE,'r') as infile: for l in infile: line = l.split('|') DR_DB_NAME=line[0] DR_TBL_NAME=line[1] DR_ST_DT=line[2] DR_END_DT=line[3] DR_FRQ=line[4] DR_LOAD_STATUS=line[5] if DR_LOAD_STATUS == 'SUCCESS': result = DR_DB_NAME+'|'+DR_TBL_NAME+'|'+DR_ST_DT+'|'+DR_END_DT+'|'+DR_FRQ+'|'+'N' with open(DATE_RANGE_OUT, 'a') as output_file: print result output_file.write(result) output_file.write("\n") def NotHistoryFn(): if LOAD_FREQ!=H: GRP_COL=SRC_DB+','+SRCTBL_NM+','+LoadFreqUsed retcode = subprocess.Popen("hive -e 'select SRC_DB,SRCTBL_NM,ST_DT,END_DT,LoadFreqUsed,LoadStatus,max(LoadEndDate) from td_auto.audit_table GROUP BY '"+GRP_COL+"'';' >> DATE_RANGE_DAY_FILE", shell=True, stdout=subprocess.PIPE) if retcode==0: print "Hive query for Audit File query for Daily Load executed successfully" DateRange_NotHistory() DateRangeRead() else: print "Hive query for Audit File query for Daily Load failed .. logs can be found in HiveLogFile" ########## Date Range Functionality Starts here, Executes only LOAD_FREQ=='H' and PARM_ST_DATE !='NA' and PARM_END_DATE !='NA' ########### ######Calling the Date Range funtion Method only LOAD_FREQ is 'H' - History def DateRangeFnCheck(): if (LOAD_FREQ=='H' and PARM_ST_DATE !='NA' and PARM_END_DATE !='NA'): DateRangeFn() DateRangeRead() DateRangeUPDT() elseif NotHistoryFn() #Load functions def MultiTableLoad(<All the parameters>): with open(OUTFILE_JOB) as f: line = f.read().split('|') JOB_NM=line[0] SRC_DB=line[1] TGD_DB=line[2] SRCTBL_NM=line[3] TGTTBL_NM=line[4] PRIMARY_KEYS=line[5] LOAD_TYPE=line[6] LOAD_FREQ=line[7] SPLIT_TYPE=line[8] SPLIT_COL=line[9] SRC_PPI=line[10] WHR_COLS=line[11] TGT_PARTITIONS=line[12] TGT_PARTITION_TYPE=line[13] TGT_PARTITION_FMT=line[14] BUCKET_COLS=line[15] NUM_BUCKETS=line[16] TGT_DDL_FILE=line[17] RUN_FLAG=line[18] GRP_ID=line[19] WHR_COL=WHR_COLS.split(',') #check the if the Load_Freq is 'H' and call Date Range method is yes DateRangeFnCheck() ##################################### MAIN METHOD ######################################## #Main method starts here script, ESP_NM = argv TDCH_LOGDIR= '/hadoop/user/hanumantharao/td_auto/TDCHlogs/' HIVE_LOGDIR= '/hadoop/user/hanumantharao/td_auto/Hivelogs/' TDCH_SCRIPTS_DIR= '/hadoop/user/hanumantharao/td_auto/tdch_commands' PIDDIR_TEMP= '/hadoop/user/hanumantharao/td_auto/temp/PID/' PIDFILE = '/hadoop/user/hanumantharao/td_auto/temp/PID_MAPPING.txt' PIDOUT_DIR = '/hadoop/user/rajesh.thota/InputFile/' DATE_RANGE_OUT = '/hadoop/user/rajesh.thota/Test/TD_Files/'+"DATE_RANGE"+'_'+ESP_NM+'_'+SRC_DB+'_'+SRCTBL_NM.out' DATE_RANGE_OUTUPDT = '/hadoop/user/rajesh.thota/Test/TD_Files/'+"DATE_RANGE_UPDT"+'_'+ESP_NM+'_'+SRC_DB+'_'+SRCTBL_NM.out' if not os.path.exists(TDCH_SCRIPTS_DIR): os.makedirs(TDCH_SCRIPTS_DIR) #Clean apath = os.path.abspath(TDCH_SCRIPTS_DIR) for file in os.listdir(TDCH_SCRIPTS_DIR): if ESP_NM in file: os.remove(apath+'/'+file) #Create DDLs for all the tables in master table. OUTFILE_JOB = '/hadoop/user/hanumantharao.valluri/ESP_NM+"_jobdetail.out" print OUTFILE_JOB retcode = subprocess.Popen("hive -e 'select * from td_auto.td_auto_base_text where esp_nm='"+ESP_NM+"' and run_flag='Y';' >> OUTFILE_JOB", shell=True, stdout=subprocess.PIPE) if retcode==0: print "Hive query executed successfully" else: print "Hive query failed .. logs can be found in HiveLogFile" #Find record count in file file_read = open(OUTFILE_JOB,'r') rec_count=sum(1 for row in file_read) print rec_count file_read.close() #Call TDCH creation functions based on row count if rec_count >1: #Below function will read file recursively and generate TDCH script MultiTableLoad() else: with open(OUTFILE_JOB) as f: line = f.read().split('|') JOB_NM=line[0] SRC_DB=line[1] TGD_DB=line[2] SRCTBL_NM=line[3] TGTTBL_NM=line[4] PRIMARY_KEYS=line[5] LOAD_TYPE=line[6] LOAD_FREQ=line[7] SPLIT_TYPE=line[8] SPLIT_COL=line[9] SRC_PPI=line[10] WHR_COLS=line[11] TGT_PARTITIONS=line[12] TGT_PARTITION_TYPE=line[13] TGT_PARTITION_FMT=line[14] BUCKET_COLS=line[15] NUM_BUCKETS=line[16] TGT_DDL_FILE=line[17] RUN_FLAG=line[18] GRP_ID=line[19] WHR_COL=WHR_COLS.split(',') #check the if the Load_Freq is 'H' and call Date Range method is yes DateRangeFnCheck() import subprocess answer = subprocess.check_output(['./FileStatus.sh']) print("the answer is {}".format(answer)) import subprocess import shlex param1='/hadoop/user/rajesh.thota/InputFile/PID_*' subprocess.call(shlex.split('/hadoop/user/rajesh.thota/FileExists.sh param1')) import subprocess retcode = subprocess.call(['sh','/hadoop/user/rajesh.thota/FileExists.sh']) ============================= fileexisitssh ==================== i=0 value1=$1 echo $value1 until [ $i != 0 ] do for filename in $value1 do if [ -f "$filename" ] then file="${filename#*}" if [[ "$file" == $value1 ]] then echo $file >> PIDS.log i=1 fi else echo "waiting For File" sleep 5s fi done done python Script ------------- import subprocess param1="/hadoop/user/rajesh.thota/InputFile/PID_*" retcode = subprocess.call(['sh','/hadoop/user/rajesh.thota/FileExists.sh', '/hadoop/user/rajesh.thota/InputFile/PID_*'])
Run
Reset
Share
Import
Link
Embed
Language▼
English
中文
Python Fiddle
Python Cloud IDE
Follow @python_fiddle
Browser Version Not Supported
Due to Python Fiddle's reliance on advanced JavaScript techniques, older browsers might have problems running it correctly. Please download the latest version of your favourite browser.
Chrome 10+
Firefox 4+
Safari 5+
IE 10+
Let me try anyway!
url:
Go
Python Snippet
Stackoverflow Question