The elegance of Airflow + the power of AWS

Orkestra

The elegance of Airflow + the power of AWS.

examples/hello_orkestra.py

import random
from typing import *
from uuid import uuid4

from aws_lambda_powertools import Logger, Tracer
from pydantic import BaseModel

from orkestra import compose
from orkestra.interfaces import Duration

def dag():
(
generate_item
>> add_price
>> copy_item
>> double_price
>> (do_nothing, assert_false)
>> say_hello
>> [random_int, random_float]
>> say_goodbye
)

class Item(BaseModel):
id: str
name: str
price: Optional[float] = None

@classmethod
def random(cls):
return cls(
id=str(uuid4()),
name=random.choice(
[
"potato",
"moon rock",
"hat",
]
),
)

logger = Logger()

tracer = Tracer()

default_args = dict(
enable_powertools=True,
timeout=Duration.seconds(6),
)

@compose(**default_args)
def generate_item(event, context):
logger.info("generating random item")
item = Item.random().dict()
logger.info(item)
tracer.put_metadata("GenerateItem", "SUCCESS")
return item

@compose(model=Item, **default_args)
def add_price(item: Item, context):
price = 3.14
logger.info(
"adding price to item",
extra={
"item": item.dict(),

 

 

 

To finish reading, please visit source site