Carreira Dev

14 nov, 2023

Lambda: implementando com GitLab CI/CD e Terraform para Integração SFTP, S3 e Databricks em Go

Publicidade

Tive uma necessidade em um cliente de reduzir o custo de processos que rodavam no databricks, uma das features que o databricks era responsável era de coletar os arquivos de vários sftp, descompactalos e colocá-los no datalake.

A automação de fluxos de trabalho de dados é um componente crucial na engenharia de dados moderna. Neste artigo, exploraremos como criar uma função AWS Lambda usando GitLab CI/CD e Terraform, que permite a uma aplicação em Go conectar-se a um servidor SFTP, coletar arquivos, armazená-los no Amazon S3 e, por fim, acionar um job no Databricks. Este processo end-to-end é essencial para sistemas que dependem de integração e automação de dados eficientes.

Lambda

O que você vai precisar para esse artigo:

  • Conta no GitLab com um repositório para o projeto.
  • Conta na AWS com permissões para criar recursos Lambda, S3 e IAM.
  • Conta no Databricks com permissões para criar e executar jobs.
  • Conhecimento básico em Go, Terraform e GitLab CI/CD.

LEIA MAIS: Importação e geração de Log de arquivos massivos com Flask + Map + lambda e multiprocessing do Python

Passo 1: Preparando a Aplicação em Go

Comece criando uma aplicação em Go que se conectará ao servidor SFTP para coletar arquivos. Utilize pacotes como github.com/pkg/sftp para estabelecer a conexão SFTP e github.com/aws/aws-sdk-go para interagir com o serviço S3 da AWS.

package main

import (
	"fmt"
	"log"
	"os"
	"path/filepath"

	"github.com/pkg/sftp"
	"golang.org/x/crypto/ssh"
	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/aws/session"
	"github.com/aws/aws-sdk-go/service/s3/s3manager"
)

func main() {
	// Configuração do cliente SFTP
	user := "seu_usuario_sftp"
	pass := "sua_senha_sftp"
	host := "endereco_sftp:22"
	config := &ssh.ClientConfig{
		User: user,
		Auth: []ssh.AuthMethod{
			ssh.Password(pass),
		},
		HostKeyCallback: ssh.InsecureIgnoreHostKey(),
	}

	// Conectar ao servidor SFTP
	conn, err := ssh.Dial("tcp", host, config)
	if err != nil {
		log.Fatal(err)
	}
	client, err := sftp.NewClient(conn)
	if err != nil {
		log.Fatal(err)
	}
	defer client.Close()

	// Baixar arquivos do SFTP
	remoteFilePath := "/path/to/remote/file"
	localDir := "/path/to/local/dir"
	localFilePath := filepath.Join(localDir, filepath.Base(remoteFilePath))
	dstFile, err := os.Create(localFilePath)
	if err != nil {
		log.Fatal(err)
	}
	defer dstFile.Close()

	srcFile, err := client.Open(remoteFilePath)
	if err != nil {
		log.Fatal(err)
	}
	defer srcFile.Close()

	if _, err := srcFile.WriteTo(dstFile); err != nil {
		log.Fatal(err)
	}

	fmt.Println("Arquivo baixado com sucesso:", localFilePath)

	// Configuração do cliente S3
	sess := session.Must(session.NewSession(&aws.Config{
		Region: aws.String("us-west-2"),
	}))
	uploader := s3manager.NewUploader(sess)

	// Carregar arquivo para o S3
	file, err := os.Open(localFilePath)
	if err != nil {
		log.Fatal(err)
	}
	defer file.Close()

	_, err = uploader.Upload(&s3manager.UploadInput{
		Bucket: aws.String("seu-bucket-s3"),
		Key:    aws.String(filepath.Base(localFilePath)),
		Body:   file,
	})
	if err != nil {
		log.Fatal("Falha ao carregar arquivo para o S3:", err)
	}

	fmt.Println("Arquivo carregado com sucesso no S3")

LEIA MAIS: Utilizando o pattern Heartbeats em Golang

Lambda Passo 2: Configurando o Terraform

O Terraform será usado para provisionar a função Lambda e os recursos necessários na AWS. Crie um arquivo main.tf com a configuração necessária para criar a função Lambda, as políticas de IAM e os buckets do S3.

provider "aws" {
  region = "us-east-1"
}

resource "aws_iam_role" "lambda_execution_role" {
  name = "lambda_execution_role"

  assume_role_policy = jsonencode({
    Version = "2012-10-17",
    Statement = [
      {
        Action = "sts:AssumeRole",
        Effect = "Allow",
        Principal = {
          Service = "lambda.amazonaws.com"
        },
      },
    ]
  })
}

resource "aws_iam_policy" "lambda_policy" {
  name        = "lambda_policy"
  description = "A policy that allows a lambda function to access S3 and SFTP resources"

  policy = jsonencode({
    Version = "2012-10-17",
    Statement = [
      {
        Action = [
          "s3:ListBucket",
          "s3:GetObject",
          "s3:PutObject",
        ],
        Effect = "Allow",
        Resource = [
          "arn:aws:s3:::seu-bucket-s3",
          "arn:aws:s3:::seu-bucket-s3/*",
        ],
      },
      // Inclua outras permissões conforme necessário
    ]
  })
}

resource "aws_iam_role_policy_attachment" "lambda_policy_attachment" {
  role       = aws_iam_role.lambda_execution_role.name
  policy_arn = aws_iam_policy.lambda_policy.arn
}

resource "aws_lambda_function" "sftp_lambda" {
  function_name = "sftp_lambda_function"

  s3_bucket = "seu-bucket-s3-com-codigo-lambda"
  s3_key    = "sftp-lambda.zip"

  handler = "main"
  runtime = "go1.x"

  role = aws_iam_role.lambda_execution_role.arn

  environment {
    variables = {
      SFTP_HOST     = "endereco_sftp",
      SFTP_USER     = "seu_usuario_sftp",
      SFTP_PASSWORD = "sua_senha_sftp",
      S3_BUCKET     = "seu-bucket-s3",
    }
  }
}

resource "aws_s3_bucket" "s3_bucket" {
  bucket = "seu-bucket-s3"
  acl    = "private"
}

LEIA MAIS: Utilizando lambda function com Go

Lambda Passo 3: Configurando o GitLab CI/CD

No GitLab, defina o pipeline CI/CD no arquivo .gitlab-ci.yml. Este pipeline deve incluir etapas para testar a aplicação Go, executar o Terraform para provisionar a infraestrutura e uma etapa para limpeza, se necessário.

stages:
  - test
  - build
  - deploy

variables:
  S3_BUCKET: "seu-bucket-s3"
  AWS_DEFAULT_REGION: "us-east-1"
  TF_VERSION: "1.0.0"

before_script:
  - 'which ssh-agent || ( apt-get update -y && apt-get install openssh-client -y )'
  - eval $(ssh-agent -s)
  - echo "$PRIVATE_KEY" | tr -d '\r' | ssh-add -
  - mkdir -p ~/.ssh
  - chmod 700 ~/.ssh
  - ssh-keyscan -H 'endereco_sftp' >> ~/.ssh/known_hosts

test:
  stage: test
  image: golang:1.18
  script:
    - go test -v ./...

build:
  stage: build
  image: golang:1.18
  script:
    - go build -o myapp
    - zip -r sftp-lambda.zip myapp
  artifacts:
    paths:
      - sftp-lambda.zip
  only:
    - master

deploy:
  stage: deploy
  image: hashicorp/terraform:$TF_VERSION
  script:
    - terraform init
    - terraform apply -auto-approve
  only:
    - master
  environment:
    name: production

LEIA MAIS: Microsserviços em Go com Prometheus utilizando Rabbitmq e Postgresql

Passo 4: Integrando com o Databricks

Após o upload dos arquivos para o S3, a função Lambda deve acionar um job no Databricks. Isso pode ser feito utilizando a API do Databricks para iniciar jobs existentes

package main

import (
	"bytes"
	"encoding/json"
	"fmt"
	"net/http"
)

// Estrutura para a requisição de iniciar um job no Databricks
type DatabricksJobRequest struct {
	JobID int `json:"job_id"`
}

// Função para acionar um job no Databricks
func triggerDatabricksJob(databricksInstance string, token string, jobID int) error {
	url := fmt.Sprintf("https://%s/api/2.0/jobs/run-now", databricksInstance)
	requestBody, _ := json.Marshal(DatabricksJobRequest{JobID: jobID})
	req, err := http.NewRequest("POST", url, bytes.NewBuffer(requestBody))
	if err != nil {
		return err
	}

	req.Header.Set("Content-Type", "application/json")
	req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", token))

	client := &http.Client{}
	resp, err := client.Do(req)
	if err != nil {
		return err
	}
	defer resp.Body.Close()

	if resp.StatusCode != http.StatusOK {
		return fmt.Errorf("Failed to trigger Databricks job, status code: %d", resp.StatusCode)
	}

	return nil
}

func main() {
	// ... (código existente para conectar ao SFTP e carregar no S3)

	// Substitua pelos seus valores reais
	databricksInstance := "your-databricks-instance"
	databricksToken := "your-databricks-token"
	databricksJobID := 123 // ID do job que você deseja acionar

	// Acionar o job no Databricks após o upload para o S3
	err := triggerDatabricksJob(databricksInstance, databricksToken, databricksJobID)
	if err != nil {
		log.Fatal("Erro ao acionar o job do Databricks:", err)
	}

	fmt.Println("Job do Databricks acionado com sucesso")
}

LEIA MAIS: 7Masters Programação Funcional – Expressões Condicionais com cálculo Lambda

Passo 5: Executando o Pipeline

Faça o push do código para o repositório GitLab para que o pipeline seja executado. Verifique se todos os passos são concluídos com sucesso e se a função Lambda está operacional e interagindo corretamente com o S3 e o Databricks.

uma vez que você tenha o código completo e o arquivo .gitlab-ci.yml configurado, você pode executar o pipeline seguindo estes passos:

  1. Faça o push do seu código para o repositório GitLab:

git add .
git commit -m "Adiciona função Lambda para integração SFTP, S3 e Databricks"
git push origin master

 

  1. O GitLab CI/CD detectará o novo commit e iniciará o pipeline automaticamente.
  2. Acompanhe a execução do pipeline no GitLab acessando a seção CI/CD do seu repositório.
  3. Se todos os estágios forem bem-sucedidos, sua função Lambda será implantada e pronta para ser usada.

 

Lembre-se de que você precisará configurar as variáveis de ambiente no GitLab CI/CD para armazenar informações sensíveis, como tokens de acesso e chaves privadas. Isso pode ser feito na seção ‘Settings’ > ‘CI / CD’ > ‘Variables’ do seu projeto GitLab.

Além disso, certifique-se de que o token do Databricks tenha as permissões necessárias para acionar jobs e que o job exista com o ID fornecido.

Conclusão Lambda:

A automação de tarefas de engenharia de dados pode ser significativamente simplificada com o uso de ferramentas como GitLab CI/CD, Terraform e AWS Lambda. Ao seguir os passos descritos neste artigo, você pode criar um sistema robusto que automatiza a coleta e integração de dados entre SFTP, S3 e Databricks, tudo isso com a eficiência e a simplicidade da linguagem Go. Com essa abordagem, você estará bem equipado para lidar com os desafios de integração de dados em escala.