Advanced Python Highlights¶
In quite some places, I have used developed some functions and classes which are the hallmarks of our learnings from this course. Sharing below a small taste of these.
Luigi¶
Problem statement: Luigi checks for a task status by checking if output file exists. This creates a gap for databases. Luigi has extended support using a SqlAlchemy Target which allows inserting rows into a database. However, there is still a gap if we want to run a task by checking the value of a row/column in a database or running a filter against a database and seeing if any rows fulfill the required filter. As an example, for the Warmmail project, I needed to identify subscribers to whom an email should be sent at a certain time. To ensure scalability, I am storing a “Next Email Datetime” in the database and I want the relevant task to run if there is any such date time which has now gone into the past.
Solution: Custom descriptor + target which checks if there are any results for a Django filter.
Descriptor Code¶
This class acts inherits from the target class and overrides the exists method.
1class RowFilterTarget(Target):
2 """A target class for filters on rows
3 Checks to see if any rows exist that satisfy the given filter
4 If no results found, return True (i.e. task is complete), else False
5 False - causes Luigi to think that task is pending and runs it + check requirements
6 """
7
8 def __init__(self, model, **kwargs):
9 self.model = model
10 self.kwargs = kwargs
11
12 def exists(self):
13 vals = self.model.objects.filter(**self.kwargs)
14 if not vals:
15 return True
16 return False
Target Output Code¶
This class operates the actual logic of running the filter on the Django database.
An additional feature implemented by this class is that it support returning the relevant fields (as requested in the arguments) to the parent task class. This ensures that these fields can be passed on to upstream tasks via the “requires” method and hence, upstream tasks do not need to check the database again.
1class RowFilterOutput:
2 """Descriptor for the output method
3 Returns a "RowFilterTarget" for the Luigi task
4 Additional feature: in case there are values returned from the filter,
5 descriptor can accept name of fields and parameters on the parent class
6 and update the parent class parameters -
7 this ensures that downstream tasks do not need to call the database again
8 """
9
10 def __init__(self, model, entries_param=None, field=None, **kwargs):
11 self.model = model
12 entries_param = (
13 entries_param if isinstance(entries_param, list) else [entries_param]
14 )
15 field = field if isinstance(field, list) else [field]
16 self.parent_updates = dict(zip(entries_param, field))
17 self.kwargs = kwargs
18
19 def __get__(self, task, cls):
20 if not task:
21 return self
22 return partial(self.__call__, task)
23
24 def __call__(self, task):
25 vals = self.model.objects.filter(**self.kwargs)
26 if vals and self.parent_updates:
27 for entry, field in self.parent_updates.items():
28 setattr(task, entry, tuple(set(getattr(v, field) for v in vals)))
29 return RowFilterTarget(self.model, **self.kwargs)
Django: Models¶
One of the interesting features of Django Models that has been used in this project is the ENUM abstraction i.e. DJango’s own way of having a restricted set of “choices” for a field in the database.
1class Subscription(models.Model):
2"""
3The main subscription model with below fields.
4Constraints: Only 1 subscription allowed per email + city combination.
5"""
6
7email = models.EmailField()
8verified = models.BooleanField(default=False)
9temp_token = models.CharField(max_length=24)
10city = models.CharField(max_length=100)
11dominentpol = models.CharField(max_length=100)
12next_email_date = models.DateTimeField(default=date.today())
13created_date = models.DateTimeField(default=date.today())
14update_date = models.DateTimeField(default=date.today())
15
16class Meta:
17 unique_together = (
18 "email",
19 "city",
20 )
21
22class Status(models.TextChoices):
23 ACTIVE = "A", _("Active")
24 INACTIVE = "I", _("Inactive")
25
26status = models.CharField(
27 max_length=1, choices=Status.choices, default=Status.ACTIVE
28)
29
30def __str__(self):
31 return self.email + " -> " + self.city
Django: Rendering¶
As part of this project, there were 2 rendering challenges to be solved:
Template rendering for emails - For this, I have used Django’s “render_to_string” functionality
Render plots as images - For this, I have used the “kaleido” library from Plotly
1def run(self):
2 city = urllib.parse.unquote(self.city)
3 df = pd.read_parquet(self.historical.output().path)
4 df = df[df["City"] == city].sort_index(ascending=False)
5 df = df[df["Specie"].isin(["pm10", "pm25"])]
6 df = df.pivot(index=None, columns="Specie", values="median")
7 df.fillna(0, inplace=True)
8 df.sort_index(inplace=True, ascending=False)
9 last_7_days = df.iloc[:6]
10
11 data = {"aqi": df.iloc[0][self.pol]}
12 df["month"] = df.index.strftime("%Y-%m")
13
14 df_month = df.groupby("month").agg("mean")
15
16 last_7_days_bar = px.bar(last_7_days, title="Last 7 Days", barmode="group")
17 month_bar = px.bar(df_month, title="Monthly", barmode="group")
18 from base64 import b64encode
19
20 data["image_last_7_days"] = b64encode(
21 to_image(last_7_days_bar, format="png", engine="kaleido")
22 ).decode()
23 data["image_months"] = b64encode(
24 to_image(month_bar, format="png", engine="kaleido")
25 ).decode()
26 html = render_to_string(
27 "subscribe/newsletter_email_template.html", {"data": data}
28 )
29 with open(self.output().path, "w") as f:
30 f.write(html)