import sys from pyspark.sql.functions import * from pyspark.sql import SparkSession from pyspark.storagelevel import StorageLevel spark = SparkSession.builder.getOrCreate() empDoctos = spark.read.format('delta').load('/dados/chronos/stream/raw/SC_EMPRESAS_DOCTOS.delta') print("=====================================APLICANDO TRIM SC EMPRESA DOCTOS CO_ATIVIDADE_SCE CO_STATUS_SCE") empDoctos = empDoctos.withColumn("CO_ATIVIDADE_SCE", trim(col("CO_ATIVIDADE_SCE"))) empDoctos = empDoctos.withColumn("CO_STATUS_SCE", trim(col("CO_STATUS_SCE"))) print("=====================================COUNT EMPDOCTOS") print(empDoctos.count()) print("=====================================") uctvbanco = spark.read.format('delta').load('/dados/chronos/stream/raw/UC_TVBANCO.delta') print("=====================================COUNT UCTVBANCO") print(uctvbanco.count()) print("=====================================") hdrpj = spark.read.format('parquet').load('/dados/chronos/raw/FS_TVCG.parquet/data_importacao=2020-09-05/type_log=open') print("===================================== COUNT HDRPJ OPEN") hdrpj = hdrpj.withColumn("TEM_BLOQUEIO", trim(col("TEM_BLOQUEIO"))) print(hdrpj.count()) print("=====================================") print("=====================================COUNT PENDENCIA") pendencia = spark.read.format('delta').load('/dados/chronos/stream/raw/RS_PENDENCIA_FINAN.delta') print(pendencia.count()) print("=====================================") print("===================================== FILTRANDO EMPRESAS DOCTOS") empDoctos_filtrada = empDoctos.where("CO_STATUS_SCE = 'A' AND CO_ATIVIDADE_SCE IN ('TELEFIXA', 'SEGURADORA', 'FINANCEIRA')") print("===================================== COUNT EMPRESAS DOCTOS FILTRADA") print(empDoctos_filtrada.count()) print("=====================================") print("===================================== FILTRANDO HDRPJ") hdrpj_filtrada = hdrpj.where("TEM_BLOQUEIO <> 'S' or TEM_BLOQUEIO is null") print("===================================== COUNT HDRPJ FILTRADA") print(hdrpj_filtrada.count()) print("=====================================") print("===================================== PRIMEIRO JOIN COM EMPRESADOCTOS") a = pendencia.alias('a') b = empDoctos_filtrada.alias('b') result_joinEmpDoctos = a.join(b, a.NU_CGCFINAN_CFI == b.NU_DOCUMENTO_SCE, how='left') print("===================================== PRIMEIRO JOIN COM EMPRESADOCTOS COUNT") print(result_joinEmpDoctos.count()) print("=====================================") print("===================================== SEGUNDO JOIN COM UCTVBANCO") a = result_joinEmpDoctos.alias('a') b = uctvbanco.alias('b') result_joinUCTVBANCO = a.join(b, a.NU_CGCFINAN_CFI == b.NU_CGC_BCO, how='left') print("===================================== SEGUNDO JOIN COM UCTVBANCO COUNT") print(result_joinUCTVBANCO.count()) print("=====================================") print("===================================== TERCEIRO JOIN COM TVCG/HDRPJ") a = result_joinUCTVBANCO.alias('a') b = hdrpj_filtrada.alias('b') result_final = a.join(b, a.NU_CGCFINAN_CFI == b.DOCUMENTO_CAD, how='left') print("===================================== TERCEIRO JOIN COM TVCG COUNT FINAL") print(result_final.count()) print("===================================== VALOR DA PENDENCIA ORIGINAL") print(pendencia.count())
Run
Reset
Share
Import
Link
Embed
Language▼
English
中文
Python Fiddle
Python Cloud IDE
Follow @python_fiddle
Browser Version Not Supported
Due to Python Fiddle's reliance on advanced JavaScript techniques, older browsers might have problems running it correctly. Please download the latest version of your favourite browser.
Chrome 10+
Firefox 4+
Safari 5+
IE 10+
Let me try anyway!
url:
Go
Python Snippet
Stackoverflow Question