تدفق الهواء: نصائح أقل شهرة ، والخدع ، وأفضل الممارسات

هناك بعض الأشياء مع كل الأدوات التي تستخدمها والتي لن تعرفها حتى بعد استخدامها لفترة طويلة. وبمجرد أن تعرف ذلك ، فأنت مثل "أتمنى أن أعرف هذا من قبل" لأنك أخبرت عميلك بالفعل أنه لا يمكن القيام بذلك بأي طريقة أفضل. لا يختلف تدفق الهواء مثل الأداة الأخرى ، فهناك بعض الأحجار الكريمة المخفية التي يمكن أن تجعل حياتك سهلة وتجعل تطوير DAG ممتعًا.

قد تعرف بالفعل بعضًا منهم ، وإذا كنت تعرفهم جميعًا - حسنًا فأنت محترف بعد ذلك.

(1) DAG مع مدير السياق

هل انزعجت من نفسك عندما نسيت إضافة dag = dag إلى مهمتك وخطأ Airflow؟ نعم ، من السهل أن ننسى إضافته لكل مهمة. من الضروري أيضًا إضافة المعلمة نفسها كما هو موضح في المثال التالي (ملف example_dag.py):

المثال (ملف example_dag.py) أعلاه يحتوي فقط على مهمتين ، ولكن إذا كان لديك 10 أو أكثر ، يصبح التكرار أكثر وضوحًا. لتجنب ذلك ، يمكنك استخدام Airflow DAGs كمديري السياق لتعيين المشغلين الجدد تلقائيًا إلى DAG كما هو موضح في المثال أعلاه (example_dag_with_context.py) باستخدام عبارة.

(2) استخدام قائمة لتعيين تبعيات المهام

عندما تريد إنشاء DAG على غرار ما هو موضح في الصورة أدناه ، يجب عليك تكرار أسماء المهام عند تعيين تبعيات المهام.

كما هو موضح في مقتطف الشفرة أعلاه ، فإن استخدام الطريقة المعتادة لدينا لتحديد تبعيات المهام يعني أن تكرر المهمة وتنتهي 3 مرات. يمكن استبدال هذا باستخدام قوائم python لتحقيق نفس النتيجة بطريقة أكثر أناقة.

(3) استخدم الوسائط الافتراضية لتجنب تكرار الوسائط

تدفق الهواء يسمح بتمرير قاموس من المعلمات التي ستكون متاحة لجميع المهمة في هذا DAG.

على سبيل المثال ، في DataReply ، نستخدم BigQuery لجميع DAGs ذات الصلة بـ DataWareshouse وبدلاً من تمرير معلمات مثل labels و bigquery_conn_id لكل مهمة ، فإننا ببساطة نمررها indefault_args القاموس كما هو موضح في DAG أدناه.

يعد هذا مفيدًا أيضًا عندما تريد التنبيهات على فشل المهام الفردية بدلاً من إخفاقات DAG فقط والتي أشرت إليها بالفعل في مدونتي الأخيرة على دمج تنبيهات Slack في Airflow.

(4) حجة "المعلمات"

"params" هو قاموس لمعلمات مستوى DAG التي يمكن الوصول إليها في القوالب. يمكن التغلب على هذه المعاملات على مستوى المهمة.

هذه حجة مفيدة للغاية ولقد كنت شخصياً أستخدمها كثيرًا حيث يمكن الوصول إليها في حقل templated مع temp jinja باستخدام params.param_name. مثال للاستخدام كالتالي:

يسهل عليك كتابة DAG ذات معلمات بدلاً من قيم الترميز الثابت. كما هو موضح في الأمثلة أعلاه يمكن تعريف القاموس params في 3 أماكن: (1) في كائن DAG (2) في default_args القاموس (3) كل مهمة.

(5) تخزين البيانات الحساسة في الاتصالات

يدرك معظم المستخدمين هذا ولكن ما زلت أرى كلمات المرور المخزنة بنص عادي داخل DAG. من أجل الخير - لا تفعل ذلك. يجب عليك كتابة DAGs بطريقة تثق بها بما يكفي لتخزين DAGs الخاصة بك في مستودع عام.

بشكل افتراضي ، سيحفظ Airflow كلمات المرور الخاصة بالاتصال بنص عادي داخل قاعدة بيانات البيانات الأولية. يوصى بشدة باستخدام حزمة التشفير أثناء تثبيت Airflow ويمكن أن يتم ذلك ببساطة عن طريق تثبيت apache-airflow [crypto].

يمكنك بعد ذلك الوصول إليه بسهولة كما يلي:

من airflow.hooks.base_hook استيراد BaseHook
slack_token = BaseHook.get_connection ('slack'). password

(6) تقييد عدد متغيرات تدفق الهواء في DAG الخاص بك

يتم تخزين متغيرات Airflow في قاعدة بيانات البيانات الأولية ، لذلك فإن أي استدعاء للمتغيرات سيعني اتصالاً بـ Metadata DB. يتم تحليل ملفات DAG الخاصة بك كل X ثانية. استخدام عدد كبير من المتغيرات في DAG (والأسوأ في default_args) قد يعني أنك قد ينتهي تشبع عدد الاتصالات المسموح بها إلى قاعدة البيانات الخاصة بك.

لتجنب هذا الموقف ، يمكنك فقط استخدام متغير Airflow واحد بقيمة JSON. نظرًا لأن متغير Airflow يمكن أن يحتوي على قيمة JSON ، يمكنك تخزين كل تكوينات DAG داخل متغير واحد كما هو موضح في الصورة أدناه:

كما هو موضح في لقطة الشاشة هذه ، يمكنك إما تخزين القيم في متغيرات Airflow منفصلة أو تحت متغير Airflow واحد كحقل JSON

يمكنك بعد ذلك الوصول إليها كما هو موضح أدناه ضمن الطريقة الموصى بها:

(7) قاموس "السياق"

غالبًا ما ينسى المستخدمون محتويات قاموس السياق عند استخدام PythonOperator مع وظيفة قابلة للاستدعاء.

يحتوي السياق على مراجع للكائنات ذات الصلة بمثيل المهمة ويتم توثيقه تحت قسم وحدات الماكرو في واجهة برمجة التطبيقات لأنها متوفرة أيضًا للحقل المجدول.

{
      "داغ": task.dag ،
      'س': س ،
      'next_ds': next_ds ،
      'next_ds_nodash': next_ds_nodash ،
      'prev_ds': prev_ds ،
      'prev_ds_nodash': prev_ds_nodash ،
      'ds_nodash': ds_nodash ،
      'ts': ts ،
      'ts_nodash': ts_nodash ،
      'ts_nodash_with_tz': ts_nodash_with_tz ،
      'Yesterday_ds': Yesterday_ds ،
      'Yesterday_ds_nodash': Yesterday_ds_nodash ،
      "غدا": "غدا"
      'orrow_ds_nodash ': tomorrow_ds_nodash ،
      'END_DATE': س ،
      "end_date": ds ،
      'dag_run': dag_run ،
      'run_id': run_id ،
      'الإعدام_التاريخ': self.execution_date ،
      'prev_execution_date': prev_execution_date ،
      'next_execution_date': next_execution_date ،
      'latest_date': س ،
      "وحدات الماكرو": وحدات الماكرو ،
      "params": params ،
      "الجداول": الجداول ،
      "المهمة": المهمة ،
      'task_instance': الذات ،
      "تي": النفس ،
      'task_instance_key_str': ti_key_str،
      "أسيوط": التكوين ،
      'test_mode': self.test_mode ،
      'var': {
          "القيمة": VariableAccessor () ،
          'json': VariableJsonAccessor ()
      }،
      "مداخل": task.inlets ،
      "المنافذ": task.outlets ،
}

(8) توليد مهام تدفق الهواء الديناميكي

لقد أجبت على العديد من الأسئلة في StackOverflow حول كيفية إنشاء المهام الحيوية. الجواب بسيط ، كل ما تحتاج إليه هو إنشاء task_id فريد لجميع مهامك. فيما يلي مثالان حول كيفية تحقيق ذلك:

(9) قم بتشغيل "ترقية تدفق الهواء" بدلاً من "بدء تدفق الهواء"

شكراً لآش برلين على هذه النصيحة في حديثه في أول لقاء لـ Apache Airflow London Meetup.

سيؤدي إنشاء تدفق الهواء initdb إلى إنشاء جميع الاتصالات الافتراضية والمخططات وما إلى ذلك التي قد لا نستخدمها ولا نريدها في قاعدة بيانات الإنتاج الخاصة بنا. سيتم بدلاً من ذلك تطبيق أي تدفق مفقود على جدول قاعدة البيانات. (بما في ذلك إنشاء الجداول المفقودة وما إلى ذلك). كما أنه آمن للتشغيل في كل مرة ، ويتتبع عمليات الترحيل التي تم تطبيقها بالفعل (باستخدام الوحدة النمطية Alembic).

اسمحوا لي أن أعرف في قسم التعليقات أدناه ما إذا كنت تعرف شيئًا يستحق الإضافة في منشور المدونة هذا. تدفق هواء سعيد :-)