Information engineering groups are steadily tasked with constructing bespoke ingestion options for myriad customized, proprietary, or industry-specific knowledge sources. Many groups discover that this work of constructing ingestion options is cumbersome and time-consuming. Recognizing these challenges, we’ve got interviewed quite a few corporations throughout completely different industries to raised perceive their numerous knowledge integration wants. This complete suggestions led us to the event of the Python Information Supply API for Apache Spark™.
One of many prospects we’ve got labored carefully with is Shell. Gear failures within the vitality sector can have important penalties, impacting security, the atmosphere, and operational stability. At Shell, minimizing these dangers is a precedence, and a method they do that is by specializing in the dependable operation of kit.
Shell owns an unlimited array of capital property and gear valued at over $180 billion. To handle the huge quantities of knowledge that Shell’s operations generate, they depend on superior instruments that improve productiveness and permit their knowledge groups to work seamlessly throughout varied initiatives. The Databricks Information Intelligence Platform performs a vital function by democratizing knowledge entry and fostering collaboration amongst Shell’s analysts, engineers, and scientists. Nevertheless, integrating IoT knowledge posed challenges for some use circumstances.
Utilizing our work with Shell for instance, this weblog will discover how this new API addresses earlier challenges and supply instance code for instance its utility.

The problem
First, let’s take a look at the problem that Shell’s knowledge engineers skilled. Though many knowledge sources of their knowledge pipelines use built-in Spark sources (e.g., Kafka), some depend on REST APIs, SDKs, or different mechanisms to show knowledge to shoppers. Shell’s knowledge engineers struggled with this truth. They ended up with bespoke options to hitch knowledge from built-in Spark sources with knowledge from these sources. This problem burned knowledge engineers’ time and vitality. As typically seen in giant organizations, such bespoke implementations introduce inconsistencies in implementations and outcomes. Bryce Bartmann, Shell’s Chief Digital Know-how Advisor, wished simplicity, telling us, “We write a variety of cool REST APIs, together with for streaming use circumstances, and would love to simply use them as an information supply in Databricks as an alternative of writing all of the plumbing code ourselves.”
“We write a variety of cool REST APIs, together with for streaming use circumstances, and would love to simply use them as an information supply in Databricks as an alternative of writing all of the plumbing code ourselves.”
– Bryce Bartmann, Chief Digital Know-how Advisor, Shell
The answer
The brand new Python customized knowledge supply API alleviates the ache by permitting the issue to be approached utilizing object-oriented ideas. The brand new API supplies summary courses that permit customized code, resembling REST API-based lookups, to be encapsulated and surfaced as one other Spark supply or sink.
Information engineers need simplicity and composability. As an illustration, think about you’re a knowledge engineer and wish to ingest climate knowledge in your streaming pipeline. Ideally, you wish to write code that appears like this:
df = spark.readStream.format("climate")
That code seems to be easy, and it’s simple to make use of for knowledge engineers as a result of they’re already conversant in the DataFrame API. Beforehand, a standard strategy to accessing a REST API in a Spark job was to make use of a PandasUDF. This text exhibits how sophisticated it may be to jot down reusable code able to sinking knowledge to a REST API utilizing a Pandas UDF. The brand new API, then again, simplifies and standardizes how Spark jobs – streaming or batch, sink or supply – work with non-native sources and sinks.
Subsequent, let’s study a real-world instance and present how the brand new API permits us to create a brand new knowledge supply (“climate” on this instance). The brand new API supplies capabilities for sources, sinks, batch, and streaming and the instance under focuses on utilizing the brand new streaming API to implement a brand new “climate” supply.
Utilizing the Python Information Supply API – a real-world situation
Think about you’re a knowledge engineer tasked with constructing an information pipeline for a predictive upkeep use case that requires strain knowledge from wellhead gear. Let’s assume the wellhead’s temperature and strain metrics move via Kafka from the IoT sensors. We all know Structured Streaming has native help for processing knowledge from Kafka. Thus far, so good. Nevertheless, the enterprise necessities current a problem: the identical knowledge pipeline should additionally seize the climate knowledge associated to the wellhead website, and this knowledge simply so occurs to not be streaming via Kafka and is as an alternative accessible by way of a REST API. The enterprise stakeholders and knowledge scientists know that climate impacts the lifespan and effectivity of kit, and people elements affect gear upkeep schedules.
Begin easy
The brand new API supplies a easy choice appropriate for a lot of use circumstances: the SimpleDataSourceStreamReader
API. The SimpleDataSourceStreamReader
API is suitable when the information supply has low throughput and doesn’t require partitioning. We are going to use it on this instance as a result of we solely want climate knowledge readings for a restricted variety of wellhead websites, and the frequency of climate readings is low.
Let us take a look at a easy instance that makes use of the SimpleDataSourceStreamReader
API.
We are going to clarify a extra sophisticated strategy later. The opposite, extra complicated strategy is right when constructing a partition-aware Python Information Supply. For now, we cannot fear about what which means. As a substitute, we are going to present an instance that makes use of the easy API.
Code instance
The code instance under assumes that the “easy” API is adequate. The __init__
methodology is crucial as a result of that’s how the reader class (WeatherSimpleStreamReader
under) understands the wellhead websites that we have to monitor. The category makes use of a “places” choice to determine places to emit climate info.
import ast
import requests
import json
from pyspark.sql.datasource import SimpleDataSourceStreamReader
from pyspark.sql.varieties import StructType
class WeatherSimpleStreamReader(SimpleDataSourceStreamReader):
def initialOffset(self):
"""
Returns the preliminary offset for studying, which serves because the beginning
level for the streaming knowledge supply.
The preliminary offset is returned as a dictionary the place every secret's a
distinctive identifier for a selected (latitude, longitude) pair, and every
worth is a timestamp string (in ISO 8601 format) representing the purpose
in time from which knowledge ought to begin being learn.
Instance:
For places [(37.7749, -122.4194), (40.7128, -74.0060)], the
offset would possibly appear like:
{
"offset_37.7749_-122.4194": "2024-09-01T00:00:00Z",
"offset_40.7128_-74.0060": "2024-09-01T00:00:00Z"
}
"""
return {f"offset_{lat}_{lengthy}": "2024-09-01T00:00:00Z" for (lat, lengthy)
in self.places}
@staticmethod
def _parse_locations(locations_str: str):
"""Converts string illustration of record of tuples to precise record
of tuples."""
return [tuple(map(float, x)) for x in ast.literal_eval(locations_str)]
def __init__(self, schema: StructType, choices: dict):
"""Initialize with schema and choices."""
tremendous().__init__()
self.schema = schema
self.places = self._parse_locations(choices.get("places", "[]"))
self.api_key = choices.get("apikey", "")
self.present = 0
self.frequency = choices.get("frequency", "minutely")
self.session = requests.Session() # Use a session for connection pooling
def learn(self, begin: dict):
"""Reads knowledge ranging from the given offset."""
knowledge = []
new_offset = {}
for lat, lengthy in self.places:
start_ts = begin[f"offset_{lat}_{long}"]
climate = self._fetch_weather(lat, lengthy, self.api_key, self.session)[self.frequency]
for entry in climate:
# Begin time is unique and finish time is inclusive.
if entry["time"] > start_ts:
knowledge.append((lat, lengthy, json.dumps(entry["values"]),
entry["time"]))
new_offset.replace({f"offset_{lat}_{lengthy}": climate[-1]["time"]})
return (knowledge, new_offset)
@staticmethod
def _fetch_weather(lat: float, lengthy: float, api_key: str, session):
"""Fetches climate knowledge for the given latitude and longitude utilizing a REST API."""
url = f"https://api.tomorrow.io/v4/climate/forecast?location={lat},{lengthy}&apikey={api_key}"
response = session.get(url)
response.raise_for_status()
return response.json()["timelines"]
Now that we’ve got outlined the easy reader class, we have to wire it into an implementation of the DataSource
summary class.
from pyspark.sql.datasource import DataSource
from pyspark.sql.varieties import StructType, StructField, DoubleType, StringType
class WeatherDataSource(DataSource):
"""
A customized PySpark knowledge supply for fetching climate knowledge from tomorrow.io for
given places (latitude, longitude).
Choices
-------
- places: specify an inventory of (latitude, longitude) tuples.
- apikey: specify the API key for the climate service (tomorrow.io).
- frequency: specify the frequency of the information ("minutely", "hourly",
"each day"). Default is "minutely".
"""
@classmethod
def title(cls):
"""Returns the title of the information supply."""
return "climate"
def __init__(self, choices):
"""Initialize with choices offered."""
self.choices = choices
self.frequency = choices.get("frequency", "minutely")
if self.frequency not in ["minutely", "hourly", "daily"]:
increase ValueError(f"Unsupported frequency: {self.frequency}")
def schema(self):
"""Defines the output schema of the information supply."""
return StructType([
StructField("latitude", DoubleType(), True),
StructField("longitude", DoubleType(), True),
StructField("weather", StringType(), True),
StructField("timestamp", StringType(), True),
])
def simpleStreamReader(self, schema: StructType):
"""Returns an occasion of the reader for this knowledge supply."""
return WeatherSimpleStreamReader(schema, self.choices)
Now that we’ve got outlined the DataSource and wired in an implementation of the streaming reader, we have to register the DataSource with the Spark session.
spark.dataSource.register(WeatherDataSource)
Meaning the climate knowledge supply is a brand new streaming supply with the acquainted DataFrame operations that knowledge engineers are comfy utilizing. This level is value stressing as a result of these customized knowledge sources profit the broader group. With a extra object-oriented strategy, the broader group ought to profit from this knowledge supply ought to they want climate knowledge as a part of their use case. Thus, the information engineers could wish to extract the customized knowledge sources right into a Python wheel library for reuse in different pipelines.
Beneath, we see how simple it’s for the information engineer to leverage the customized stream.
websites = """[
(60.3933, 5.8341), # Snorre Oil Field, Norway
(58.757, 2.198), # Schiehallion, UK
(58.871, 4.862), # Clair field, UK
(57.645, 3.164), # Elgin-Franklin, UK
(54.932, -5.498), # Sean field, UK
(-14.849, 12.395), # Angola offshore
(1.639, 100.468), # Malampaya, Philippines
(-27.0454, 152.1213), # Australia offshore
(38.1, -119.8), # California offshore
(52.784, 1.698) # Leman, North Sea
]"""
show(
spark.readStream.format("climate")
.choice("places", websites)
.choice("apikey", "tomorrow_io_api_key")
.load()
)
Instance outcomes:

Different concerns
When to make use of the partition-aware API
Now that we’ve got walked via the Python Information Supply’s “easy” API, we are going to clarify an choice for partition consciousness. Partition-aware knowledge sources permit you to parallelize the information era. In our instance, a partition-aware knowledge supply implementation would lead to employee duties dividing the places throughout a number of duties in order that the REST API calls can fan out throughout employees and the cluster. Once more, our instance doesn’t embody this sophistication as a result of the anticipated knowledge quantity is low.
Batch vs. Stream APIs
Relying on the use case and whether or not you want the API to generate the supply stream or sink the information, you need to deal with implementing completely different strategies. In our instance, we don’t fear about sinking knowledge. We additionally ought to have included the batch reader implementation. Nevertheless, you’ll be able to deal with implementing the required courses in your particular use case.
supply | sink | |
---|---|---|
batch | reader() | author() |
streaming | streamReader() or simpleStreamReader() | streamWriter() |
When to make use of the Author APIs
This text has centered on the Reader APIs used within the readStream
. The author APIs permit related arbitrary logic on the output facet of the information pipeline. For instance, let’s assume that the operations managers on the wellhead need the information pipeline to name an API on the wellhead website that exhibits a purple/yellow/inexperienced gear standing that leverages the pipeline’s logic. The Author API would permit knowledge engineers the identical alternative to encapsulate the logic and expose an information sink that may function like acquainted writeStream
codecs.
Conclusion
“Simplicity is the final word sophistication.” – Leonardo da Vinci
As architects and knowledge engineers, we now have a chance to simplify batch and streaming workloads utilizing the PySpark customized knowledge supply API. As you discover alternatives for brand new knowledge sources that may profit your knowledge groups, contemplate separating the information sources for reuse throughout the enterprise, for instance, via the usage of a Python wheel.
The Python Information Supply API is strictly what we wanted. It supplies a chance for our knowledge engineers to modularize code needed for interacting with our REST APIs and SDKs. The truth that we will now construct, check, and floor reusable Spark knowledge sources throughout the org will assist our groups transfer sooner and have extra confidence of their work.”
– Bryce Bartmann, Chief Digital Know-how Advisor, Shell
In conclusion, the Python Information Supply API for Apache Spark™ is a strong addition that addresses important challenges beforehand confronted by knowledge engineers working with complicated knowledge sources and sinks, significantly in streaming contexts. Whether or not utilizing the “easy” or partition-aware API, engineers now have the instruments to combine a broader array of knowledge sources and sinks into their Spark pipelines effectively. As our walkthrough and the instance code demonstrated, implementing and utilizing this API is simple, enabling fast wins for predictive upkeep and different use circumstances. The Databricks documentation (and the Open Supply documentation) clarify the API in additional element, and a number of other Python knowledge supply examples will be discovered right here.
Lastly, the emphasis on creating customized knowledge sources as modular, reusable parts can’t be overstated. By abstracting these knowledge sources into standalone libraries, groups can foster a tradition of code reuse and collaboration, additional enhancing productiveness and innovation. As we proceed to discover and push the boundaries of what is attainable with large knowledge and IoT, applied sciences just like the Python Information Supply API will play a pivotal function in shaping the way forward for data-driven decision-making within the vitality sector and past.
In case you are already a Databricks buyer, seize and modify one in all these examples to unlock your knowledge that’s sitting behind a REST API. In case you are not but a Databricks buyer, get began without cost and check out one of many examples as we speak.