pypond package¶
Subpackages¶
- pypond.io package
- pypond.processor package
- Submodules
- pypond.processor.aggregator module
- pypond.processor.align module
- pypond.processor.base module
- pypond.processor.collapser module
- pypond.processor.converter module
- pypond.processor.filler module
- pypond.processor.filter module
- pypond.processor.mapper module
- pypond.processor.offset module
- pypond.processor.rate module
- pypond.processor.selector module
- pypond.processor.taker module
- Module contents
Submodules¶
pypond.bases module¶
Common base classes and mixins.
-
class
pypond.bases.
Observable
¶ Bases:
pypond.bases.PypondBase
Base class for objects in the processing chain which need other object to listen to them. It provides a basic interface to define the relationships and to emit events to the interested observers.
-
add_observer
(observer)¶ add an observer if it does not already exist.
-
emit
(event)¶ add event to observers.
-
flush
()¶ flush observers.
-
has_observers
()¶ does the object have observers?
-
-
class
pypond.bases.
PypondBase
¶ Bases:
object
Universal base class. Used to provide common functionality (logging, etc) to all the other classes.
-
pypond.bases.
setup_log
(log_path=None)¶ Usage: _log(‘main.start’, ‘happy simple log event’) _log(‘launch’, ‘more={0}, complex={1} log=event’.format(100, 200))
pypond.collection module¶
Implementation of Pond Collection class.
-
class
pypond.collection.
Collection
(instance_or_list=None, copy_events=True)¶ Bases:
pypond.io.input.Bounded
A collection is a list of Events. You can construct one out of either another collection, or a list of Events. You can addEvent() to a collection and a new collection will be returned.
Basic operations on the list of events are also possible. You can iterate over the collection with a for..of loop, get the size() of the collection and access a specific element with at().
Initialize from copy, lists, etc.
instance_or_list arg can be:
- a Collection object (copy ctor)
- a python list
- a pyrsistent.pvector
The list and pvector will contain Events.
Parameters: - instance_or_list (list, Collection, pyrsistent.pvector) – A collection object to copy or a list of Event objects
- copy_events (bool, optional) – Copy event list when using copy constructor, otherwise the new object has an emtpy event list.
-
__str__
()¶ call to_string()
to_string() is already being tested so skip coverage.
Returns: String representation of the object. Return type: str
-
add_event
(event)¶ Add an event to the payload and return a new Collection object.
Parameters: event (Event) – Event object to add to collection. Returns: New collection with the event added to it. Return type: Collection
-
aggregate
(func, field_path=None)¶ Aggregates the events down using a user defined function to do the reduction. Only a single column can be aggregated on so this takes a field_path, NOT a field_spec.
This is essentially a wrapper around map/reduce, constraining it to a single column and returning the value, not the dict from map().
Parameters: - func (function) – Function to pass to map reduce to aggregate.
- field_path (str, list, tuple, None, optional) –
Name of a single value to look up. If None, defaults to [‘value’]. “Deep” syntax either [‘deep’, ‘value’], (‘deep’, ‘value’,) or ‘deep.value.’
If field_path is None, then [‘value’] will be the default.
Returns: Returns the aggregated value, so it depends on what kind of data are being handled/aggregation being done.
Return type: various
-
at
(pos)¶ Returns an item in the collection by its index position.
Creates a new object via copy ctor.
Parameters: pos (int) – Index of the event to be retrieved. Returns: A new Event object of the event at index pos Return type: Event Raises: CollectionException
– Raised if there is an index error.
-
at_first
()¶ Retrieve the first item in this collection.
Returns: An event instance. Return type: Event
-
at_key
(searchkey)¶ Returns a list of events in the Collection which have the exact key (time, timerange or index) as the key specified by ‘at’. Note that this is an O(n) search for the time specified, since collections are an unordered bag of events.
Parameters: key (datetime, str, TimeRange) – The key of the event Returns: List of all events at that key. Return type: list
-
at_last
()¶ Return the last event item in this collection.
Returns: An event instance. Return type: Event
-
at_time
(time)¶ Return an item by time. Primarily a utility method that sits in front of bisect() and fetches using at().
If you have events at 12:00 and 12:02 and you make the query at 12:01, the one at 12:00 will be returned. Otherwise it will return the exact match.
Parameters: time (datetime.datetime) – Datetime object >= to the event to be returned. Must be an aware UTC datetime object. Returns: Returns a new Event instance via at() Return type: Event
-
avg
(field_path=None, filter_func=None)¶ Get avg
Parameters: - field_path (str, list, tuple, None, optional) –
Name of a single value to look up. If None, defaults to [‘value’]. “Deep” syntax either [‘deep’, ‘value’], (‘deep’, ‘value’,) or ‘deep.value.’
If field_path is None, then [‘value’] will be the default.
- filter_func (function, None) – A function (static method really) from the Filters class in module pypond.functions.Filters. It will control how bad or missing (None, NaN, empty string) values will be cleansed or filtered during aggregation. If no filter is specified, then the missing values will be retained which will potentially cause errors.
Returns: Average value.
Return type: int or float
- field_path (str, list, tuple, None, optional) –
-
bisect
(dtime, b=0)¶ Finds the index that is just less than the time t supplied. In other words every event at the returned index or less has a time before the supplied t, and every sample after the index has a time later than the supplied t.
Optionally supply a begin index to start searching from.
- dtime - python datetime object to bisect collection with
- will be made into an aware datetime in UTC.
- b - position to start
Returns index that is the greatest but still below t - see docstring for at_time()
Parameters: - dtime (datetime.datetime) – Datetime object >= to the event to be returned. Must be an aware UTC datetime object.
- b (int, optional) – Array index to start searching from
Returns: The index of the searched-for event
Return type: int
Raises: CollectionException
– Raised if given a naive or non-UTC dtime
-
clean
(field_path=None)¶ Returns a new Collection by testing the fieldSpec values for being valid (not NaN, null or undefined). The resulting Collection will be clean for that fieldSpec.
Parameters: field_path (str, list, tuple, None, optional) – Name of a single value to look up. If None, defaults to [‘value’]. “Deep” syntax either [‘deep’, ‘value’], (‘deep’, ‘value’,) or ‘deep.value.’
If field_path is None, then [‘value’] will be the default.
Returns: New collection containing only “clean” events. Return type: Collection
-
collapse
(field_spec_list, name, reducer, append=True)¶ Takes a fieldSpecList (list of column names) and collapses them to a new column which is the reduction of the matched columns in the fieldSpecList.
Parameters: - field_spec_list (list) – List of columns to collapse. If you need to retrieve deep nested values that [‘can.be’, ‘done.with’, ‘this.notation’].
- name (str) – Name of new column containing collapsed data.
- reducer (function) – Function to pass to reducer.
- append (bool, optional) – Append collapsed column to existing data or make new events with only that column.
Returns: New collection containing the collapsed data.
Return type:
-
count
()¶ Get count - calls size()
Returns: Num events in the collection. Return type: int
-
dedup
()¶ Remove duplicates from the Collection. If duplicates exist in the collection with the same key but with different values, the later event values will be used.
Returns: A new collection w/out duplicates. Return type: Collection
-
static
equal
(coll1, coll2)¶ Test to see if instances are the same instance.
Parameters: - coll1 (Collection) – A collection.
- coll2 (Collection) – Another collection.
Returns: True if same instance.
Return type: bool
-
event_list
()¶ Returns the raw Immutable event list.
Returns: Raw immutable event list. Return type: pyrsistent.pvector
-
event_list_as_list
()¶ return a python list of the event list.
Returns: Thawed version of internal immutable data structure. Return type: list
-
event_list_as_map
()¶ Return the events in the collection as a dict of lists where the key is the timestamp, index or timerange and the value is an array of events with that key.
Returns: Description Return type: TYPE
-
events
()¶ Generator to allow for..of loops over series.events()
for i in series.events(): do_stuff(i)
Returns: An iterator to loop over the events. Return type: iterator
-
filter
(func)¶ Filter the collection’s event list with the supplied function. The function will be passed each of the Event objects and return a boolean value. If True, then it will be included in the filter.
def is_even(event): return bool(event.get('some_value') % 2 == 0)
Would produce a new collection where ‘some_value’ is only even numbers.
Parameters: func (function) – Function to filter with. Returns: New collection containing filtered events. Return type: Collection
-
first
(field_path=None, filter_func=None)¶ Get first value in the collection for the fspec
Parameters: - field_spec (str, list, tuple, None) – Column or columns to look up. If you need to retrieve multiple deep nested values that [‘can.be’, ‘done.with’, ‘this.notation’]. A single deep value with a string.like.this. If None, all columns will be operated on.
- filter_func (function, None) – A function (static method really) from the Filters class in module pypond.functions.Filters. It will control how bad or missing (None, NaN, empty string) values will be cleansed or filtered during aggregation. If no filter is specified, then the missing values will be retained which will potentially cause errors.
Returns: Type varies depending on underlying data
Return type: depends on data
-
is_chronological
()¶ Checks that the events in this collection are in chronological order.
Returns: True if events are in chronologcal order. Return type: bool
-
last
(field_path=None, filter_func=None)¶ Get last value in the collection for the fspec
Parameters: - field_spec (str, list, tuple, None) – Column or columns to look up. If you need to retrieve multiple deep nested values that [‘can.be’, ‘done.with’, ‘this.notation’]. A single deep value with a string.like.this. If None, all columns will be operated on.
- filter_func (function, None) – A function (static method really) from the Filters class in module pypond.functions.Filters. It will control how bad or missing (None, NaN, empty string) values will be cleansed or filtered during aggregation. If no filter is specified, then the missing values will be retained which will potentially cause errors.
Returns: Type varies depending on underlying data
Return type: depends on data
-
map
(func)¶ Map function. Apply function to the collection events and return a new Collection from the resulting events. Function must creat a new Event* instance.
def in_only(event): # make new events wtin only data value "in". return Event(event.timestamp(), {'in': event.get('in')})
Parameters: func (function) – Mapper function Returns: New collection containing mapped events. Return type: Collection
-
max
(field_path=None, filter_func=None)¶ Get max
Parameters: - field_path (str, list, tuple, None, optional) –
Name of a single value to look up. If None, defaults to [‘value’]. “Deep” syntax either [‘deep’, ‘value’], (‘deep’, ‘value’,) or ‘deep.value.’
If field_path is None, then [‘value’] will be the default.
- filter_func (function, None) – A function (static method really) from the Filters class in module pypond.functions.Filters. It will control how bad or missing (None, NaN, empty string) values will be cleansed or filtered during aggregation. If no filter is specified, then the missing values will be retained which will potentially cause errors.
Returns: Maximum value.
Return type: int or float
- field_path (str, list, tuple, None, optional) –
-
mean
(field_path=None, filter_func=None)¶ Get mean
Parameters: - field_path (str, list, tuple, None, optional) –
Name of a single value to look up. If None, defaults to [‘value’]. “Deep” syntax either [‘deep’, ‘value’], (‘deep’, ‘value’,) or ‘deep.value.’
If field_path is None, then [‘value’] will be the default.
- filter_func (function, None) – A function (static method really) from the Filters class in module pypond.functions.Filters. It will control how bad or missing (None, NaN, empty string) values will be cleansed or filtered during aggregation. If no filter is specified, then the missing values will be retained which will potentially cause errors.
Returns: Mean value (grrr!).
Return type: int or float
- field_path (str, list, tuple, None, optional) –
-
median
(field_path=None, filter_func=None)¶ Get median
Parameters: - field_path (str, list, tuple, None, optional) –
Name of a single value to look up. If None, defaults to [‘value’]. “Deep” syntax either [‘deep’, ‘value’], (‘deep’, ‘value’,) or ‘deep.value.’
If field_path is None, then [‘value’] will be the default.
- filter_func (function, None) – A function (static method really) from the Filters class in module pypond.functions.Filters. It will control how bad or missing (None, NaN, empty string) values will be cleansed or filtered during aggregation. If no filter is specified, then the missing values will be retained which will potentially cause errors.
Returns: Median value.
Return type: int or float
- field_path (str, list, tuple, None, optional) –
-
min
(field_path=None, filter_func=None)¶ Get min
Parameters: - field_path (str, list, tuple, None, optional) –
Name of a single value to look up. If None, defaults to [‘value’]. “Deep” syntax either [‘deep’, ‘value’], (‘deep’, ‘value’,) or ‘deep.value.’
If field_path is None, then [‘value’] will be the default.
- filter_func (function, None) – A function (static method really) from the Filters class in module pypond.functions.Filters. It will control how bad or missing (None, NaN, empty string) values will be cleansed or filtered during aggregation. If no filter is specified, then the missing values will be retained which will potentially cause errors.
Returns: Minimum value.
Return type: int or float
- field_path (str, list, tuple, None, optional) –
-
percentile
(perc, field_path, method='linear', filter_func=None)¶ Gets percentile perc within the Collection. This works the same way as numpy.
Parameters: - perc (int) – The percentile (should be between 0 and 100)
- field_path (str, list, tuple, None, optional) –
Name of a single value to look up. If None, defaults to [‘value’]. “Deep” syntax either [‘deep’, ‘value’], (‘deep’, ‘value’,) or ‘deep.value.’
If field_path is None, then [‘value’] will be the default.
- method (str, optional) –
Specifies the interpolation method to use when the desired percentile lies between two data points. Options are:
linear: i + (j - i) * fraction, where fraction is the fractional part of the index surrounded by i and j.
lower: i
higher: j
nearest: i or j whichever is nearest
midpoint: (i + j) / 2
Returns: The percentile.
Return type: int or float
-
quantile
(num, field_path=None, method='linear')¶ Gets num quantiles within the Collection
Parameters: - num (Number of quantiles to divide the Collection into.) – Description
- field_path (None, optional) – The field to return as the quantile. If not set, defaults to ‘value.’
- method (str, optional) –
Specifies the interpolation method to use when the desired percentile lies between two data points. Options are:
linear: i + (j - i) * fraction, where fraction is the fractional part of the index surrounded by i and j.
lower: i
higher: j
nearest: i or j whichever is nearest
midpoint: (i + j) / 2
Returns: An array of quantiles
Return type: list
-
range
()¶ From the range of times, or Indexes within the TimeSeries, return the extents of the Collection/TimeSeries as a TimeRange.
Returns: Extents as time range. Return type: TimeRange
-
static
same
(coll1, coll2)¶ Test to see if the collections have the same values.
Parameters: - coll1 (Collection) – A collection.
- coll2 (Collection) – Another collection.
Returns: True if same values.
Return type: bool
-
set_events
(events)¶ Create a new Collection from this one and set the internal list of events
Parameters: events (list or pyrsistent.pvector) – A list of events Returns: Returns a new collection with the event list set to the everts arg Return type: Collection Raises: CollectionException
– Raised if wrong arg type.
-
size
()¶ Number of items in collection.
Returns: Number of items in collection Return type: int
-
size_valid
(field_path=None)¶ Returns the number of valid items in this collection.
Uses the fieldSpec to look up values in all events. It then counts the number that are considered valid, i.e. are not NaN, undefined or null.
Parameters: field_path (str, list, tuple, None, optional) – Name of a single value to look up. If None, defaults to [‘value’]. “Deep” syntax either [‘deep’, ‘value’], (‘deep’, ‘value’,) or ‘deep.value.’
If field_path is None, then [‘value’] will be the default.
Returns: Number of valid <field_path> values in all of the Events. Return type: int
-
slice
(begin, end)¶ Perform a slice of events within the Collection, returns a new Collection representing a portion of this TimeSeries from begin up to but not including end. Uses typical python [slice:syntax].
Parameters: - begin (int) – Slice begin.
- end (int) – Slice end.
Returns: New collection with sliced payload.
Return type:
-
sort
(field_path)¶ Sorts the Collection using the value referenced by field_path.
Parameters: field_path (str, list, tuple, None, optional) – Name of a single value to look up. If None, defaults to [‘value’]. “Deep” syntax either [‘deep’, ‘value’], (‘deep’, ‘value’,) or ‘deep.value.’
If field_path is None, then [‘value’] will be the default.
Returns: New collection of sorted values. Return type: Collection
-
sort_by_time
()¶ Return a new instance of this collection after making sure that all of the events are sorted by timestamp.
Returns: A copy of this collection with the events chronologically sorted. Return type: Collection
-
stdev
(field_path=None, filter_func=None)¶ Get std dev
Parameters: - field_path (str, list, tuple, None, optional) –
Name of a single value to look up. If None, defaults to [‘value’]. “Deep” syntax either [‘deep’, ‘value’], (‘deep’, ‘value’,) or ‘deep.value.’
If field_path is None, then [‘value’] will be the default.
- filter_func (function, None) – A function (static method really) from the Filters class in module pypond.functions.Filters. It will control how bad or missing (None, NaN, empty string) values will be cleansed or filtered during aggregation. If no filter is specified, then the missing values will be retained which will potentially cause errors.
Returns: Standard deviation.
Return type: int or float
- field_path (str, list, tuple, None, optional) –
-
sum
(field_path=None, filter_func=None)¶ Get sum
Parameters: - field_path (str, list, tuple, None, optional) –
Name of a single value to look up. If None, defaults to [‘value’]. “Deep” syntax either [‘deep’, ‘value’], (‘deep’, ‘value’,) or ‘deep.value.’
If field_path is None, then [‘value’] will be the default.
- filter_func (function, None) – A function (static method really) from the Filters class in module pypond.functions.Filters. It will control how bad or missing (None, NaN, empty string) values will be cleansed or filtered during aggregation. If no filter is specified, then the missing values will be retained which will potentially cause errors.
Returns: Summed value.
Return type: int or float
- field_path (str, list, tuple, None, optional) –
-
to_json
()¶ Returns the collection as json object.
This is actually like json.loads(s) - produces the actual vanilla data structure.
Returns: A thawed list of Event objects. Return type: list
-
to_string
()¶ Retruns the collection as a string, useful for serialization.
In JS land, this is synonymous with __str__ or __unicode__
Use custom object encoder because this is a list of Event* objects.
Returns: String representation of this object. Return type: str
pypond.event module¶
Implementation of the Pond Event classes.
http://software.es.net/pond/#/events
-
class
pypond.event.
Event
(instance_or_time, data=None)¶ Bases:
pypond.event.EventBase
A generic event. This represents a data object at a single timestamp, supplied at initialization.
The timestamp may be a python date object, datetime object, or ms since UNIX epoch. It is stored internally as a datetime object.
The data may be any type.
Asking the Event object for the timestamp returns an integer copy of the number of ms since the UNIX epoch. There’s no method on the Event object to mutate the Event timestamp after it is created.
The creation of an Event is done by combining two parts: the timestamp (or time range, or Index…) and the data.
To construct you specify the timestamp as either:
- a python date or datetime object
- millisecond timestamp: the number of ms since the UNIX epoch
To specify the data you can supply either:
- a python dict
- a pyrsistent.PMap created with pyrsistent.freeze(), or
- a simple type such as an integer. In the case of the simple type this is a shorthand for supplying {“value”: v}.
If supplying a PMap for either of the args (rather than supplying a python dict and letting the Event class handle it which is preferred), create it with freeze() and not pmap(). This is because any nested dicts must similarly be made immutable and pmap() will only freeze the “outer” dict.
Parameters: - instance_or_time (Event, pyrsistent.PMap, int, datetime.datetime) – An event for copy constructor, a fully formed and formatted immutable data payload, or an int (epoch ms) or a datetime.datetime object to create a timestamp from.
- data (None, optional) – Could be dict/PMap/int/float/str to use for data payload.
-
static
avg
(events, field_spec=None, filter_func=None)¶ combine() called with a averaging function as a reducer.
Parameters: - events (list) – A list of Event objects
- field_spec (list, str, None, optional) – Column or columns to look up. If you need to retrieve multiple deep nested values that [‘can.be’, ‘done.with’, ‘this.notation’]. A single deep value with a string.like.this. If None, all columns will be operated on.
- filter_func (function, None) – A function (static method really) from the Filters class in module pypond.functions.Filters. It will control how bad or missing (None, NaN, empty string) values will be cleansed or filtered during aggregation. If no filter is specified, then the missing values will be retained which will potentially cause errors.
Returns: A list containing the averaged events.
Return type: list
-
begin
()¶ The begin time of this Event, which will be just the timestamp.
Returns: Datetime object Return type: datetime.datetime
-
collapse
(field_spec_list, name, reducer, append=False)¶ Collapses this event’s columns, represented by the fieldSpecList into a single column. The collapsing itself is done with the reducer function. Optionally the collapsed column could be appended to the existing columns, or replace them (the default).
Parameters: - field_spec_list (list) – List of columns to collapse. If you need to retrieve deep nested values that [‘can.be’, ‘done.with’, ‘this.notation’].
- name (str) – Name of new column with collapsed data.
- reducer (function) – Function to pass to reducer.
- append (bool, optional) – Set True to add new column to existing data dict, False to create a new Event with just the collapsed data.
Returns: New event object.
Return type:
-
static
combine
(events, field_spec, reducer)¶ Combines multiple events together into a new array of events, one for each time/index/timerange of the source events. The list of events may be specified as an array or Immutable.List. Combining acts on the fields specified in the fieldSpec and uses the reducer function to take the multiple values and reducer them down to one.
The return result will be an of the same form as the input. If you pass in an array of events, you will get an array of events back. If you pass an Immutable.List of events then you will get an Immutable.List of events back.
This is the general version of Event.sum() and Event.avg(). If those common use cases are what you want, just use those functions. If you want to specify your own reducer you can use this function.
See also: TimeSeries.timeSeriesListSum()
Parameters: - events (list) – List of Event objects
- field_spec (string, list) – Column or columns to look up. If you need to retrieve multiple deep nested values that [‘can.be’, ‘done.with’, ‘this.notation’]. A single deep value with a string.like.this. If None, all columns will be operated on.
- reducer (function) – Reducer function to apply to column data
Returns: List of new events
Return type: list
Raises: EventException
– Raised if illegal input is received.
-
end
()¶ The end time of this Event, which will be just the timestamp.
Returns: Datetime object Return type: datetime.datetime
-
static
is_duplicate
(event1, event2, ignore_values=True)¶ Returns if the two supplied events are duplicates of each other. By default, duplicated means that the timestamps are the same. This is the case with incoming events where the second event is either known to be the same (but duplicate) of the first, or supersedes the first. You can also pass in false for ignoreValues and get a full compare.
Parameters: - event1 (Event, IndexedEvent or TimeSeriesEvent) – One of the event variants.
- event2 (Event, IndexedEvent or TimeSeriesEvent) – One of the event variants.
- ignore_values (bool, optional) – If set to True, the values of the events will be compared as well. The default means only the type and key will be compared.
Returns: Description
Return type: TYPE
-
static
is_valid_value
(event, field_path=None)¶ The same as Event.value() only it will return false if the value is either undefined, NaN or Null.
Parameters: - event (Event) – An event.
- field_path (str, list, tuple, None, optional) –
Name of value to look up. If None, defaults to [‘value’]. “Deep” syntax either [‘deep’, ‘value’], (‘deep’, ‘value’,) or ‘deep.value.’
If field_path is None, then [‘value’] will be the default.
Returns: Return false if undefined, NaN, or None.
Return type: bool
-
key
()¶ Return timestamp as ms since epoch
Returns: ms since epoch. Return type: int
-
static
map
(events, field_spec=None)¶ Maps a list of events according to the selection specification in. The spec may be a single field name, a list of field names, or a function that takes an event and returns a key/value pair.
Example 1 in out 3am 1 2 4am 3 4 result -> {in: [1, 3], out: [2, 4]}
Parameters: - events (list) – A list of events
- field_spec (str, list, func or None, optional) –
Column or columns to map. If you need to retrieve multiple deep nested values that [‘can.be’, ‘done.with’, ‘this.notation’]. A single deep value with a string.like.this. If None, then all columns will be mapped.
If field_spec is a function, the function should return a dict. The keys will be come the “column names” that will be used in the dict that is returned.
Returns: A dict of mapped columns/values.
Return type: dict
-
static
map_reduce
(events, field_spec, reducer)¶ map and reduce
Parameters: - events (list) – A list of events
- field_spec (str, list, func or None, optional) – Column or columns to map. If you need to retrieve multiple deep nested values that [‘can.be’, ‘done.with’, ‘this.notation’]. A single deep value with a string.like.this. If None, then all columns will be mapped.
- reducer (function) – The reducer function.
Returns: A dict as returned by reduce()
Return type: dict
-
static
merge
(events)¶ Merges multiple events together into a new array of events, one for each time/index/timerange of the source events. Merging is done on the data of each event. Values from later events in the list overwrite early values if fields conflict.
Common use cases:
- append events of different timestamps
- merge in events with one field to events with another
- merge in events that supersede the previous events
Parameters: events (list) – A list of a homogenous kind of event. Returns: A list of the merged events. Return type: list Raises: EventException
– Raised if event list is not homogenous.
-
static
reduce
(mapped, reducer)¶ Takes a list of events and a reducer function and returns a new Event with the result, for each column. The reducer is of the form
function sum(valueList) { return calcValue; }
Parameters: - mapped (dict) – Dict as produced by map()
- reducer (function) – The reducer function.
Returns: A dict of reduced values.
Return type: dict
-
static
same
(event1, event2)¶ Different name for is() which is an invalid method name. Different than __eq__ - see Object.is() JS documentation.
Check if the two objects are the same.
Parameters: Returns: Returns True if the event payloads is the same.
Return type: bool
-
static
selector
(event, field_spec=None)¶ Function to select specific fields of an event using a fieldSpec and return a new event with just those fields.
The fieldSpec currently can be:
- A single field name
- An list of field names
The function returns a new event.
Parameters: - event (Event) – Event to pull from.
- field_spec (str, list, tuple, None, optional) – Column or columns to look up. If you need to retrieve multiple deep nested values that [‘can.be’, ‘done.with’, ‘this.notation’]. A single deep value with a string.like.this. If None, the default column ‘value’ will be used.
Returns: A new event object.
Return type:
-
set_data
(data)¶ Sets the data portion of the event and returns a new Event.
Parameters: data (dict) – New data payload for this event object. Returns: A new event object. Return type: Event
-
static
sum
(events, field_spec=None, filter_func=None)¶ combine() called with a summing function as a reducer. All of the events need to have the same timestamp.
Parameters: - events (list) – A list of Event objects
- field_spec (list, str, None, optional) – Column or columns to look up. If you need to retrieve multiple deep nested values that [‘can.be’, ‘done.with’, ‘this.notation’]. A single deep value with a string.like.this. If None, all columns will be operated on.
- filter_func (function, None) – A function (static method really) from the Filters class in module pypond.functions.Filters. It will control how bad or missing (None, NaN, empty string) values will be cleansed or filtered during aggregation. If no filter is specified, then the missing values will be retained which will potentially cause errors.
Returns: A list containing the summed events.
Return type: list
Raises: EventException
– Raised on mismatching timestamps.
-
timestamp
()¶ The timestamp of this data
Returns: Datetime object Return type: datetime.datetime
-
timestamp_as_local_string
()¶ The timestamp of this data, in Local time, as a formatted string.
Returns: Formatted data string. Return type: str
-
timestamp_as_utc_string
()¶ The timestamp of this data, in UTC time, as a formatted string.
Returns: Formatted data string. Return type: str
-
to_json
()¶ Returns the Event as a JSON object, essentially
{time: ms_since_epoch, data: {key: value, ...}}
This is actually like json.loads(s) - produces the actual data structure from the object internal data.
Returns: time/data keys Return type: dict
-
to_point
(cols=None)¶ Returns a flat array starting with the timestamp, followed by the values. Can be given an optional list of columns so the returned list will have the values in order. Primarily for the TimeSeries wire format.
Parameters: cols (list, optional) – List of data columns to order the data points in so the TimeSeries wire format lines up correctly. If not specified, the points will be whatever order that dict.values() decides to return it in. Returns: Epoch ms followed by points. Return type: list
-
type
()¶ Return type of the event object
Returns: Return the class of thise event type. Return type: class
-
class
pypond.event.
EventBase
(underscore_d)¶ Bases:
pypond.bases.PypondBase
Common code for the event classes.
Parameters: underscore_d (pyrsistent.pmap) – Immutable dict-like object containing the payload for the events. -
__eq__
(other)¶ equality operator. need this to be able to check if the event_list in a collection is the same as another.
Parameters: other (Event) – Event object for == comparison. Returns: True if other event has same payload. Return type: bool
-
__str__
()¶ call to_string()
-
begin
()¶ abstract, override in subclass
Raises: NotImplementedError
– Needs to be implemented in subclasses.
-
data
()¶ Direct access to the event data. The result will be an pyrsistent.pmap.
Returns: The immutable data payload. Return type: pyrsistent.pmap
-
static
data_from_arg
(arg)¶ extract data from a constructor arg and make immutable.
Parameters: arg (dict, pmap, int, float, str) – Data payloas as passed to one of the constructors. If dict or pmap, that is used as the data payload, if other value, then presumed to be a simple payload of {‘value’: arg}. Returns: Immutable dict-like object Return type: pyrsistent.pmap Raises: EventException
– Raised on bad arg input.
-
end
()¶ abstract, override in subclass
Raises: NotImplementedError
– Needs to be implemented in subclasses.
-
get
(field_path=None)¶ Get specific data out of the Event. The data will be converted to a js object. You can use a fieldSpec to address deep data. A fieldSpec could be “a.b” or it could be [‘a’, ‘b’]. Favor the list version please.
The field spec can have an arbitrary number of “parts.”
Parameters: field_path (str, list, tuple, None, optional) – Name of value to look up. If None, defaults to [‘value’]. “Deep” syntax either [‘deep’, ‘value’], (‘deep’, ‘value’,) or ‘deep.value.’
If field_path is None, then [‘value’] will be the default.
Returns: Type depends on underyling data Return type: various
-
static
index_from_args
(instance_or_index, utc=True)¶ create Index from a constructor arg.
Parameters: - instance_or_index (Index or str) – Index value as passed to a constructor
- utc (bool, optional) – Use utc time internally, please don’t not do this.
Returns: New Index object from args.
Return type: Raises: EventException
– Raised on invalid arg.
-
stringify
()¶ Produce a json string of the internal data.
Returns: String representation of this object’s internal data. Return type: str
-
static
timerange_from_arg
(arg)¶ create TimeRange from a constructor arg.
Parameters: arg (list, tuple, pvector or TimeRange) – Time value as passed to one of the constructors. Returns: New TimeRange instance from args Return type: TimeRange Raises: EventException
– Raised on invalid arg.
-
timestamp
()¶ abstract, override in subclass
Raises: NotImplementedError
– Needs to be implemented in subclasses.
-
static
timestamp_from_arg
(arg)¶ extract timestamp from a constructor arg.
Parameters: arg (int or datetime.datetime) – Time value as passed to one of the constructors Returns: Datetime object that has been sanitized Return type: datetime.datetime Raises: EventException
– Does not accept unaware datetime objects.
-
to_json
()¶ abstract, override in subclasses.
Raises: NotImplementedError
– Needs to be implemented in subclasses.
-
to_string
()¶ Retruns the Event as a string, useful for serialization. It’s a JSON string of the whole object.
In JS land, this is synonymous with __str__ or __unicode__
Returns: String representation of this object. Return type: str
-
ts
¶ A property to expose the datetime.datetime value returned by the timestamp() method. This is so we can support sorting of a list of events via the following method:
ordered = sorted(self._event_list, key=lambda x: x.ts)Returns: Returns the value returned by timestamp() Return type: datetime.datetime
-
value
(field_path=None)¶ Alias for get()
Parameters: field_path (str, list, tuple, None) – Name of value to look up. If None, defaults to [‘value’]. “Deep” syntax either [‘deep’, ‘value’], (‘deep’, ‘value’,) or ‘deep.value.’
If field_path is None, then [‘value’] will be the default.
Returns: Type depends on underlying data. Return type: various
-
pypond.exceptions module¶
Custom exception and warning classes.
-
exception
pypond.exceptions.
CollectionException
(value)¶ Bases:
exceptions.Exception
Custom Collection exception
-
exception
pypond.exceptions.
CollectionWarning
¶ Bases:
exceptions.Warning
Custom Collection warning
-
exception
pypond.exceptions.
EventException
(value)¶ Bases:
exceptions.Exception
Custom Event exception
-
exception
pypond.exceptions.
EventWarning
¶ Bases:
exceptions.Warning
Custom Event warning
-
exception
pypond.exceptions.
FilterException
(value)¶ Bases:
exceptions.Exception
Custom Filter exception
-
exception
pypond.exceptions.
FilterWarning
¶ Bases:
exceptions.Warning
Custom Filter warning
-
exception
pypond.exceptions.
FunctionException
(value)¶ Bases:
exceptions.Exception
Custom Function exception
-
exception
pypond.exceptions.
FunctionWarning
¶ Bases:
exceptions.Warning
Custom Function warning
-
exception
pypond.exceptions.
IndexException
(value)¶ Bases:
exceptions.Exception
Custom Index exception
-
exception
pypond.exceptions.
IndexWarning
¶ Bases:
exceptions.Warning
Custom Index warning
-
exception
pypond.exceptions.
PipelineException
(value)¶ Bases:
exceptions.Exception
Custom Pipeline exception
-
exception
pypond.exceptions.
PipelineIOException
(value)¶ Bases:
exceptions.Exception
Custom PipelineIO exception
-
exception
pypond.exceptions.
PipelineIOWarning
¶ Bases:
exceptions.Warning
Custom PipelineIO warning
-
exception
pypond.exceptions.
PipelineWarning
¶ Bases:
exceptions.Warning
Custom Pipeline warning
-
exception
pypond.exceptions.
ProcessorException
(value)¶ Bases:
exceptions.Exception
Custom Processor exception
-
exception
pypond.exceptions.
ProcessorWarning
¶ Bases:
exceptions.Warning
Custom Processor warning
-
exception
pypond.exceptions.
TimeRangeException
(value)¶ Bases:
exceptions.Exception
Custom TimeRange exception
-
exception
pypond.exceptions.
TimeRangeWarning
¶ Bases:
exceptions.Warning
Custom TimeRange warning
-
exception
pypond.exceptions.
TimeSeriesException
(value)¶ Bases:
exceptions.Exception
Custom TimeSeries exception
-
exception
pypond.exceptions.
TimeSeriesWarning
¶ Bases:
exceptions.Warning
Custom TimeSeries warning
-
exception
pypond.exceptions.
UtilityException
(value)¶ Bases:
exceptions.Exception
Custom Utility exception
-
exception
pypond.exceptions.
UtilityWarning
¶ Bases:
exceptions.Warning
Custom Utility warning
pypond.functions module¶
Functions to act as reducers/aggregators, etc.
-
class
pypond.functions.
Filters
¶ Bases:
object
Filter functions to pass to aggregation function factory methods.
These all control how the underlying aggregators handle missing/invalid values. Can pass things through (the default to all agg functions), ignore any bad values, transform any bad values to zero, or make the entire aggregation fail if there are any bad values.
-
static
ignore_missing
(events)¶ Pull out the bad values resulting in a shorter array.
-
static
keep_missing
(events)¶ no-op - default
-
static
none_if_empty
(events)¶ Return none if the event list is empty. Could be used to override the default behavior of Functions.avg(), etc
-
static
propagate_missing
(events)¶ It’s all bad if there are missing values - return None if so.
-
static
zero_missing
(events)¶ Make bad values 0 - array will be the same length.
-
static
-
class
pypond.functions.
Functions
¶ Bases:
object
Utility class to contain the functions.
The inner() function is the one that does the actual processing and it returned by calling the outer named function. Previously one would pass Functions.sum to an aggregation or reducer method:
timeseries.aggregate(Functions.sum, 'in')
Now it is a factory to return the actual function:
timeseries.aggregate(Functions.sum(), 'in')
The static methods in the Filters class can be passed to the outer factory method to control how bad values are handled:
timeseries.aggregate(Functions.sum(Filters.zero_missing), 'in')
-
static
avg
(flt=<function keep_missing>)¶
-
static
count
(flt=<function keep_missing>)¶
-
static
difference
(flt=<function keep_missing>)¶
-
static
first
(flt=<function keep_missing>)¶
-
static
keep
(flt=<function keep_missing>)¶
-
static
last
(flt=<function keep_missing>)¶
-
static
max
(flt=<function keep_missing>)¶
-
static
median
(flt=<function keep_missing>)¶
-
static
min
(flt=<function keep_missing>)¶
-
static
percentile
(perc, method='linear', flt=<function keep_missing>)¶
-
static
stddev
(flt=<function keep_missing>)¶
-
static
sum
(flt=<function keep_missing>)¶
-
static
-
pypond.functions.
f_check
(flt)¶ Set the default filter for aggregation operations when no filter is specified. When one is, make sure that it is a valid filter.
pypond.index module¶
Implementation of Pond Index class.
http://software.es.net/pond/#/index
-
class
pypond.index.
Index
(s, utc=True)¶ Bases:
pypond.bases.PypondBase
An index that represents as a string a range of time. That range may either be in UTC or local time. UTC is the default.
The actual derived timerange can be found using asRange(). This will return a TimeRange instance.
The original string representation can be found with toString(). A nice version for date based indexes (e.g. 2015-03) can be generated with toNiceString(format) (e.g. March, 2015).
The index string arg will may be of two forms:
- 2015-07-14 (day)
- 2015-07 (month)
- 2015 (year)
or:
- 1d-278 (range, in n x days, hours, minutes or seconds)
Parameters: - s (str) – The index string in one of the aforementioned formats.
- utc (bool, optional) – Index interpreted as UTC or localtime. Please don’t set this to false since non-UTC times are the devil.
Raises: IndexException
– Raised if arg s could not be translated into a valid timerange/index.-
__str__
()¶ call to_string()
Returns: String representation of the object. Return type: str
-
as_string
()¶ Alias for to_string()
Returns: The index string as previously outlined. Return type: str
-
as_timerange
()¶ Returns the Index as a TimeRange
Returns: The underlying time range object. Return type: TimeRange
-
begin
()¶ Returns start date of the index.
Returns: Start date of the index. Return type: datetime.datetime
-
end
()¶ Returns end date of the index.
Returns: End date of the index. Return type: datetime.datetime
-
static
get_daily_index_string
(date, utc=True)¶ Generate an index string with day granularity.
Parameters: - date (datetime.datetime) – An aware UTC datetime object
- utc (bool, optional) – Render the index in local time this is used for display purposes to render charts in a localized way.
Returns: The formatted index string.
Return type: string
-
static
get_index_string
(win, dtime)¶ Return the index string given an index prefix and a datetime object. Example usage follows.
dtime = aware_dt_from_args( dict(year=2015, month=3, day=14, hour=7, minute=32, second=22)) idx_str = Index.get_index_string('5m', dtime) self.assertEqual(idx_str, '5m-4754394')
previously: Generator.bucketIndex
Parameters: - win (str) – Prefix of the index string.
- dtime (datetime.datetime) – Datetime to generate index string from.
Returns: The index string.
Return type: str
-
static
get_index_string_list
(win, timerange)¶ Given the time range, return a list of strings of index values every <prefix> tick. Example usage follows (from test suite).
dtime_1 = aware_dt_from_args( dict(year=2015, month=3, day=14, hour=7, minute=30, second=0)) dtime_2 = aware_dt_from_args( dict(year=2015, month=3, day=14, hour=8, minute=29, second=59)) idx_list = Index.get_index_string_list('5m', TimeRange(dtime_1, dtime_2)) self.assertEqual(len(idx_list), 12) self.assertEqual(idx_list[0], '5m-4754394') self.assertEqual(idx_list[-1], '5m-4754405')
previously: Generator.bucketIndexList
Parameters: - win (str) – Prefix of the index string.
- timerange (TimeRange) – Time range object to generate index string from
Returns: A list of strings of index values at every “tick” in the range specified.
Return type: list
-
static
get_monthly_index_string
(date, utc=True)¶ Generate an index string with month granularity.
Parameters: - date (datetime.datetime) – An aware UTC datetime object
- utc (bool, optional) – Render the index in local time this is used for display purposes to render charts in a localized way.
Returns: The formatted index string.
Return type: string
-
static
get_yearly_index_string
(date, utc=True)¶ Generate an index string with year granularity.
Parameters: - date (datetime.datetime) – An aware UTC datetime object
- utc (bool, optional) – Render the index in local time this is used for display purposes to render charts in a localized way.
Returns: The formatted index string.
Return type: string
-
range_from_index_string
(idx_str, is_utc=True)¶ Generate the time range from the idx string.
The index string arg will may be of two forms:
- 2015-07-14 (day)
- 2015-07 (month)
- 2015 (year)
or:
- 1d-278 (range, in n x days, hours, minutes or seconds)
and return a TimeRange for that time. The TimeRange may be considered to be local time or UTC time, depending on the utc flag passed in.
This was in src/util.js in the original project, but the only thing using the code in that util.js was the Index class, and it makes more sense having this as a class method and setting self._index_type makes further regex analysis of the index unnecessary.
Parameters: - idx_str (str) – The index string in one of the aformentioned formats
- is_utc (bool, optional) – Index interpreted as utc or localtime. Please don’t use localtime.
Returns: A time range made from the interpreted index string.
Return type: Raises: IndexException
– Raised when the string format is determined to be invalid.
-
to_json
()¶ Returns the Index as JSON, which will just be its string representation
This is actually like json.loads(s) - produces the actual data structure.
Returns: The index string as previously outlined. Return type: str
-
to_nice_string
(fmt=None)¶ for the calendar range style Indexes, this lets you return that calendar range as a human readable format, e.g. “June, 2014”. The format specified is a Moment.format.
Originally implemented at Util.niceIndexString in the JS source, this is just a greatly simplified version using self._index_type.
Parameters: fmt (str, optional) – User can pass in a valid strftime() format string. Returns: FThe index text string as a formatted (strftime()) time. Return type: str
-
to_string
()¶ Simply returns the Index as its string
In JS land, this is synonymous with __str__ or __unicode__
Returns: The index string as previously outlined. Return type: str
-
utc
¶ accessor for internal utc boolean.
-
static
window_duration
(win)¶ duration in ms given a window duration string.
previously: Generator.getLengthFromSize.
Parameters: win (str) – An index string in the previously mentioned 1d-278 style format. Returns: Duration of the index/range in ms. Return type: int
-
static
window_position_from_date
(win, dtime)¶ window position from datetime object. Called by get_index_string_list().
previously: Generator.getBucketPosFromDate
Parameters: - win (str) – Prefix if the index string.
- dtime (datetime.datetime) – Datetime to calculate suffix from.
Returns: The suffix for the index string.
Return type: int
pypond.indexed_event module¶
Event with a time range specified as an index.
-
class
pypond.indexed_event.
IndexedEvent
(instance_or_begin, data=None, utc=True)¶ Bases:
pypond.event.EventBase
Associates a time range specified as an index.
The creation of an IndexedEvent is done by combining two parts: the Index and the data.
To construct you specify an Index, along with the data.
The index may be an Index, or a string.
- To specify the data you can supply either:
- a python dict containing key values pairs
- an pyrsistent.pmap, or
- a simple type such as an integer. In the case of the simple type this is a shorthand for supplying {“value”: v}.
Parameters: - instance_or_begin (Index, pyrsistent.pmap, or str.) – Index for copy constructor, pmap as the fully formed internals or a string arg to the Index class.
- data (dict or pyrsistent.pmap, optional) – Data payload.
- utc (bool, optional) – UTC or localtime to create index in. Please don’t not use UTC. Yes, that’s a double negative.
-
begin
()¶ The begin time of this Event, which will be just the timestamp.
Returns: Datetime of the beginning of the range. Return type: datetime.datetime
-
end
()¶ The end time of this Event, which will be just the timestamp.
Returns: Datetime of the end of the range. Return type: datetime.datetime
-
index
()¶ Returns the Index associated with the data in this Event.
Returns: The underlying index object Return type: Index
-
index_as_string
()¶ Returns the Index as a string, same as event.index().toString().
Returns: str – String version of the underlying Index. Returns: String version of the underlying index. Return type: str
-
set_data
(data)¶ Sets the data portion of the event and returns a new IndexedEvent.
Parameters: - data (dict) – The new data portion for this event object.
- data – The new data payload for this event object.
Returns: IndexedEvent - a new IndexedEvent object.
Returns: A new indexed event with the provided payload.
Return type:
-
timerange
()¶ The TimeRange of this data.
Returns: Time range from the underlying index. Return type: TimeRange
-
timerange_as_local_string
()¶ The timerange of this data, in Local time, as a string..
Returns: Underlying TimeRange as localtime string. Return type: str
-
timerange_as_utc_string
()¶ The timerange of this data, in UTC time, as a string.
Returns: Underlying TimeRange as UTC string. Return type: str
-
timestamp
()¶ The timestamp of this beginning of the range.
Returns: Datetime of the beginning of the range. Return type: datetime.datetime
-
to_json
()¶ Returns the Event as a JSON object, essentially: {time: t, data: {key: value, …}}
This is actually like json.loads(s) - produces the actual vanilla data structure.
Returns: Dictionary representation of object internals. Return type: dict
-
to_point
(cols=None)¶ Returns a flat array starting with the timestamp, followed by the values. Doesn’t include the groupByKey (key).
Can be given an optional list of columns so the returned list will have the values in order. Primarily for the TimeSeries wire format.
Parameters: cols (list, optional) – List of columns to order the points in so the TimeSeries wire format is rendered corectly. Returns: Epoch ms followed by points. Return type: list
-
type
()¶ Return the class of this event type.
Returns: The class of this event type. Return type: class
pypond.pipeline module¶
Implementation of the Pond Pipeline classes.
http://software.es.net/pond/#/pipeline
-
class
pypond.pipeline.
Pipeline
(arg=None)¶ Bases:
pypond.bases.PypondBase
Build a new Pipeline.
A pipeline manages a processing chain, for either batch or stream processing of collection data.
The argument may be either:
- a Pipeline (copy ctor)
- a pyrsistent.PMap in which case the internal state will be constructed from the map.
Usually you would initialize a Pipeline using the factory function, rather than this object directly.
Parameters: arg (Pipeline, PMap, None) – See above. -
add_result
(arg1, arg2=None)¶ Add the incoming result from the processor callback.
Parameters: - arg1 (str) – Collection key string.
- arg2 (Collection or str) – Generally the incoming collection.
-
aggregate
(fields)¶ Uses the current Pipeline windowing and grouping state to build collections of events and aggregate them.
IndexedEvents will be emitted out of the aggregator based on the emitOn state of the Pipeline.
To specify what part of the incoming events should be aggregated together you specify a fields object. This is a map from fieldName to operator.
uin = Stream() ( Pipeline() .from_source(uin) .window_by('1h') .emit_on('eachEvent') .aggregate( { 'in_avg': {'in': Functions.avg()}, 'out_avg': {'out': Functions.avg()} } ) .to(EventOut, cback) )
Parameters: fields (dict) – Fields and operators to be aggregated. Deep fields may be indicated by using this.style.notation. As in the above example, they fields.keys() are the names of the new columns to be created (or an old one to be overwritten), and the value is another dict - the key is the existing column and the value is the function to apply to it when creating the new column. Returns: The Pipeline Return type: Pipeline
-
align
(field_spec=None, window='5m', method='linear', limit=None)¶ Align entry point
-
as_events
(options=None)¶ Converts incoming TimeRangeEvents or IndexedEvents to Events. This is helpful since some processors will emit TimeRangeEvents or IndexedEvents, which may be unsuitable for some applications.
There are three options:
- use the beginning time (options = Options(alignment=’lag’)
- use the center time (options = Options(alignment=’center’)
- use the end time (options = Options(alignment=’lead’)
Parameters: options (Options) – The options, see above. Returns: The Pipeline. Return type: Pipeline
-
as_indexed_events
(options=None)¶ Converts incoming Events to IndexedEvents.
Note: It isn’t possible to convert TimeRangeEvents to IndexedEvents.
Parameters: options (Options) – Contains the conversion options. In this case, the duration string of the Index is expected. Must contain the key ‘duration’ and the duration string is of the form “1h” for one hour, “30s” for 30 seconds and so on. Returns: Description Return type: TYPE
-
as_time_range_events
(options=None)¶ Converts incoming Events or IndexedEvents to TimeRangeEvents.
There are three option for alignment:
- time range will be in front of the timestamp - ie: options = Options(alignment=’front’)
- time range will be centered on the timestamp - ie: options = Options(alignment=’center’)
- time range will be positoned behind the timestamp - ie: options = Options(alignment=’behind’)
The duration is of the form “1h” for one hour, “30s” for 30 seconds and so on.
Parameters: options (dict) – Args to add to Options - duration and alignment. Returns: The Pipeline Return type: Pipeline
-
clear_group_by
()¶ Remove the grouping from the pipeline. In other words recombine the events.
Returns: The Pipeline Return type: Pipeline
-
clear_results
()¶ Clear the result state of this Pipeline instance.
-
clear_window
()¶ Remove windowing from the Pipeline. This will return the pipeline to no window grouping. This is useful if you have first done some aggregation by some window size and then wish to collect together the all resulting events.
Returns: The Pipeline Return type: Pipeline
-
collapse
(field_spec_list, name, reducer, append=True)¶ Collapse a subset of columns using a reducer function.
Parameters: - field_spec_list (list) – List of columns to collapse. If you need to retrieve deep nested values that [‘can.be’, ‘done.with’, ‘this.notation’].
- name (string) – The resulting output column’s name.
- reducer (function) – Function to use to do the reduction.
- append (bool) – Add the new column to the existing ones, or replace them.
Returns: The Pipeline.
Return type:
-
count
(observer, force=True)¶ Outputs the count of events.
Parameters: - observer (function) – The callback function. This function will be passed collection.size(), window_key, group_by_key) as args.
- force (bool, optional) – Flush at the end of processing batch events, output again with possibly partial result
Returns: The Pipeline.
Return type:
-
emit_on
(trigger)¶ Sets the condition under which an accumulated collection will be emitted. If specified before an aggregation this will control when the resulting event will be emitted relative to the window accumulation. Current options are:
- to emit on every event, or
- just when the collection is complete, or
- when a flush signal is received, either manually calling done(), or at the end of a bounded source.
The strings indicating how to trigger how a Collection should be emitted - can be:
- “eachEvent” - when a new event comes in, all currently maintained collections will emit their result.
- “discard” - when a collection is to be discarded, first it will emit. But only then.
- “flush” - when a flush signal is received.
The difference will depend on the output you want, how often you want to get updated, and if you need to get a partial state. There’s currently no support for late data or watermarks. If an event passes comes in after a collection window, that collection is considered finished.
Parameters: trigger (string) – See above Returns: The Pipeline Return type: Pipeline
-
fill
(field_spec=None, method='zero', fill_limit=None)¶ Take the data in this timeseries and “fill” any missing or invalid values. This could be setting None values to zero so mathematical operations will succeed, interpolate a new value, or pad with the previously given value.
Parameters: - field_spec (str, list, tuple, None, optional) –
Column or columns to look up. If you need to retrieve multiple deep nested values that [‘can.be’, ‘done.with’, ‘this.notation’]. A single deep value with a string.like.this.
If None, the default column field ‘value’ will be used.
- method (str, optional) – Filling method: zero | linear | pad
- fill_limit (None, optional) – Set a limit on the number of consecutive events will be filled before it starts returning invalid values. For linear fill, no filling will happen if the limit is reached before a valid value is found.
Returns: The Pipeline.
Return type: - field_spec (str, list, tuple, None, optional) –
-
filter
(op)¶ Filter the event stream using an operator
Parameters: op (function) – A function that returns True or False Returns: The Pipeline Return type: Pipeline
-
from_source
(src)¶ Note: originally named from() in JS code.
The source to get events from. The source needs to be able to iterate its events using for..of loop for bounded Ins, or be able to emit() for unbounded Ins. The actual batch, or stream connection occurs when an output is defined with to().
Pipelines can be chained together since a source may be another Pipeline.
Parameters: src (Bounded, Stream or Pipeline) – The source for the Pipeline, or another Pipeline. Returns: The Pipeline. Return type: Pipeline
-
get_emit_on
()¶ Get the emit on (eachEvent, etc).
Returns: The emit on string (discards, flush, etc). Return type: str
-
get_group_by
()¶ Get the group by callback.
Returns: Returns the group by function. Return type: function
-
get_utc
()¶ Get the UTC state..
Returns: In UTC or not. Return type: bool
-
get_window_duration
()¶ Get the window duration.
Returns: A formatted window duration. Return type: str
-
get_window_type
()¶ Get the window type (global, etc).
Returns: The window type. Return type: str
-
group_by
(key=None)¶ Sets a new groupBy expression. Returns a new Pipeline.
Grouping is a state set on the Pipeline. Operations downstream of the group specification will use that state. For example, an aggregation would occur over any grouping specified.
The key to group by. You can pass in a function that takes and event as an arg and dynamically returns the group by key.
Otherwise key will be interpreted as a field_path:
- a single field name or deep.column.path, or
- a array style field_path [‘deep’, ‘column’, ‘path’] to a single column.
This is not a list of multiple columns, it is the path to a single column to pull group by keys from. For example, a column called ‘status’ that contains the values ‘OK’ and ‘FAIL’ - they key would be ‘status’ and two collections OK and FAIL will be generated.
If key is None, then the default column ‘value’ will be used.
Parameters: key (function, list or string) – The key to group by. See above. Returns: The Pipeline Return type: Pipeline
-
input
()¶ Originally called in() in JS code.
-
map
(op)¶ Map the event stream using an operator.
Parameters: op (function) – A function that returns a new Event. Returns: The Pipeline. Return type: Pipeline
-
mode
()¶ Get the pipeline mode (ie: batch, stream).
Returns: The mode. Return type: str
-
offset_by
(offset_by, field_spec=None)¶ Processor to offset a set of fields by a value. Mostly used for testing processor and pipeline operations with a simple operation.
Parameters: - offset_by (int, float) – The amout to offset by.
- field_spec (str, list, tuple, None, optional) –
Column or columns to look up. If you need to retrieve multiple deep nested values that [‘can.be’, ‘done.with’, ‘this.notation’]. A single deep value with a string.like.this.
If None, the default ‘value’ column will be used.
Returns: The modified Pipeline.
Return type:
-
rate
(field_spec=None, allow_negative=True)¶ derivative entry point
-
results_done
()¶ Set result state as done.
-
select
(field_spec=None)¶ Select a subset of columns.
Parameters: field_spec (str, list, tuple, None, optional) – Column or columns to look up. If you need to retrieve multiple deep nested values that [‘can.be’, ‘done.with’, ‘this.notation’]. A single deep value with a string.like.this.
If None, the default ‘value’ column will be used.
Returns: The Pipeline. Return type: Pipeline
-
take
(limit)¶ Take events up to the supplied limit, per key.
Parameters: - limit (int) – Integer number of events to take.
- global_flush (bool, optional) – If set to true (default is False) then the Taker will send out a single .flush() event if the limit has been exceeded and the window_type is ‘global.’ This can be used as a fail safe with processors that cache events (like the Filler) to ensure all events are emitted when the Pipeline is used in ‘stream’ mode. This is not needed in ‘batch’ mode because the flush signal is sent automatically.
Returns: The Pipeline.
Return type:
-
to
(out, observer=None, options=<pypond.util.Options object>)¶ Sets up the destination sink for the pipeline.
For a batch mode connection, i.e. one with a Bounded source, the output is connected to a clone of the parts of the Pipeline dependencies that lead to this output. This is done by a Runner. The source input is then iterated over to process all events into the pipeline and though to the Out.
For stream mode connections, the output is connected and from then on any events added to the input will be processed down the pipeline to the out.
def cback(event): do_something_with_the_event(event) timeseries = TimeSeries(IN_OUT_DATA) ( Pipeline() .from_source(timeseries) .emit_on('flush') .collapse(['in', 'out'], 'total', Functions.sum()) .aggregate(dict(total=Functions.max())) .to(EventOut, cback) )
NOTE: arg list has been changed from the ordering in the JS source to conform to python convention.
Parameters: - out (EventOut, CollectionOut, etc instance) – The output.
- observer (function or instance) – The observer.
- options (Options, optional) – Options.
Returns: The Pipeline.
Return type:
-
to_event_list
()¶ Directly return the results from the processor rather than passing a callback in.
Returns: Returns the _results attribute with events. Return type: list or dict
-
to_keyed_collections
()¶ Directly return the results from the processor rather than passing a callback in.
Returns: Returns the _results attribute from a Pipeline object after processing. Will contain Collection objects. Return type: list or dict
-
window_by
(window_or_duration=None, utc=True)¶ Set the window, returning a new Pipeline. A new window will have a type and duration associated with it. Current available types are:
- fixed (e.g. every 5m)
- calendar based windows (e.g. every month)
Windows are a type of grouping. Typically you’d define a window on the pipeline before doing an aggregation or some other operation on the resulting grouped collection. You can combine window-based grouping with key-grouping (see groupBy()).
There are several ways to define a window. The general format is an options object containing a type field and a duration field.
Currently the only accepted type is fixed, but others are planned. For duration, this is a duration string, for example “30s” or “1d”. Supported are: seconds (s), minutes (m), hours (h) and days (d).
The argument here is either a string or an object with string attrs type and duration. The arg can be either a window or a duration.
If no arg is supplied or set to None, the window_type is set to ‘global’ and there is no duration.
There is also a short-cut notation for a fixed window or a calendar window. Simply supplying the duration string (“30s” for example) will result in a fixed window type with the supplied duration.
Window window_or_duration may be:
- A fixed interval duration (see next): “fixed”
- A calendar interval: “daily,” “monthly” or “yearly”
Duration is of the form:
- “30s” or “1d” etc - supports seconds (s), minutes (m), hours (h), days (d). When duration is passed as the arg, window_type is set to ‘fixed’.
Parameters: - window_or_duration (string, Capsule) – See above.
- utc (bool) – How to render the aggregations - in UTC vs. the user’s local time. Can not be set to False if using a fixed window size.
Returns: The Pipeline.
Return type:
-
class
pypond.pipeline.
Runner
(pline, output)¶ Bases:
pypond.bases.PypondBase
A runner is used to extract the chain of processing operations from a Pipeline given an Output. The idea here is to traverse back up the Pipeline(s) and build an execution chain.
When the runner is started, events from the “in” are streamed into the execution chain and outputed into the “out”.
Rebuilding in this way enables us to handle connected pipelines:
|-- in --> pipeline ---. |----pipeline ---| -> out
The runner breaks this into the following for execution:
_input - the "in" or from() bounded input of the upstream pipeline _processChain - the process nodes in the pipelines leading to the out _output - the supplied output destination for the batch process
- NOTE: There’s no current way to merge multiple sources, though
- a time series has a TimeSeries.merge() static method for this purpose.
Parameters: - pipeline (Pipeline) – The pipeline to run.
- output (PipelineOut) – The output driving this runner
-
start
(force=False)¶ Start the runner
Parameters: - force (bool, optional) – force Flush at the end of the batch source
- cause any buffers to emit. (to) –
-
pypond.pipeline.
default_callback
(*args)¶ Default no-op callback for group_by in the Pipeline constructor.
pypond.range module¶
Implementation of Pond TimeRange classes.
http://software.es.net/pond/#/timerange
-
class
pypond.range.
TimeRange
(instance_or_begin, end=None)¶ Bases:
pypond.range.TimeRangeBase
Builds a new TimeRange. First arg may be of several different formats:
- Another TimeRange (copy constructor)
- A python tuple, list or pyrsistent.PVector object containing two python datetime objects or ms timestamps.
- Two arguments, begin and end, each of which may be a datetime object, or a ms timestamp.
Parameters: - instance_or_begin (TimeRange, iterable, int or datetime.datetime.) – See above for variations.
- end (int or datetime.datetime, optional) – Optional arg for the end of the time range.
Raises: TimeRangeException
– Raised to indicate errors with args.-
__str__
()¶ string repr method.
Returns: String repr method. Return type: str
-
begin
()¶ Returns the begin time of the TimeRange.
Returns: The begin time. Return type: datetime.datetime
-
contains
(other)¶ Returns true if other is completely inside this.
Parameters: other (TimeRange) – Another time range object. Returns: Returns true if other range is completely inside this one. Return type: bool
-
disjoint
(other)¶ Returns true if the passed in other Range in no way overlaps this time Range.
Parameters: other (TimeRange) – Another time range object. Returns: Returns true if other range in no way overlaps this one. Return type: bool
-
duration
()¶ Return epoch milliseconds.
Returns: Duration in ms. Return type: int
-
end
()¶ Returns the end time of the TimeRange.
Returns: The end time. Return type: datetime.datetime
-
equals
(other)¶ Returns if the two TimeRanges can be considered equal, in that they have the same times.
Parameters: other (TimeRange) – Another time range object Returns: True if both object represent the same time range. Return type: bool
-
extents
(other)¶ Returns a new Timerange which covers the extents of this and other combined.
Parameters: other (TimeRange) – Another time range object Returns: New time range which covers the extents of this and the other range combined. Return type: TimeRange
-
humanize
()¶ Returns a human friendly version of the TimeRange, e.g. “Aug 1, 2014 05:19:59 am to Aug 1, 2014 07:41:06 am”
This displays in local time, so don’t freak out.
Returns: Human friendly time range string. Return type: str
-
humanize_duration
()¶ Humanize the duration.
Returns: Humanized duration string. Return type: str
-
intersection
(other)¶ Returns a new TimeRange which represents the intersection (overlapping) part of this and other.
Parameters: other (TimeRange) – Another time range object. Returns: A new time range object representing the intersection (overlapping) part of this and the other. Return type: TimeRange
-
static
last_day
()¶ Generate a time range spanning last 24 hours
Returns: A new time range object of the requested duration. Return type: TimeRange
-
static
last_month
()¶ Generate a time range spanning last month.
Returns: A new time range object of the requested duration. Return type: TimeRange
-
static
last_ninety_days
()¶ Generate a time range spanning last 90 days
Returns: A new time range object of the requested duration. Return type: TimeRange
-
static
last_seven_days
()¶ Generate a time range spanning last 7 days
Returns: A new time range object of the requested duration. Return type: TimeRange
-
static
last_thirty_days
()¶ Generate a time range spanning last 30 days
Returns: A new time range object of the requested duration. Return type: TimeRange
-
overlaps
(other)¶ Returns true if the passed in other TimeRange overlaps this time Range.
Parameters: other (TimeRange) – Another time range object. Returns: Returns true if other range overlaps this one. Return type: bool
-
range
()¶ Returns the internal range, which is an Immutable List containing begin and end values.
Returns: Immutable list containing the range. Return type: pyrsistent.pvector
-
relative_string
()¶ Returns a human friendly version of the TimeRange, e.g. e.g. “a few seconds ago to a month ago”
Returns: Another human friendly duration string. Return type: str
-
set_begin
(dtime)¶ Sets a new begin time on the TimeRange. The result will be a new TimeRange.
Parameters: dtime (datetime.datetime) – New time range boundary. Returns: A new time range object reflecting the new range bounds. Return type: TimeRange Raises: TimeRangeException
– Raised on invalid arg.
-
set_end
(dtime)¶ Sets a new end time on the TimeRange. The result will be a new TimeRange.
Parameters: dtime (datetime.datetime) – New time range boundary. Returns: A new time range object reflecting the new range bounds. Return type: TimeRange Raises: TimeRangeException
– Raised on invalid arg.
-
to_json
()¶ Returns the TimeRange as a python list of two ms timestamps.
Returns: List of two timestamps. Return type: list
-
to_local_string
()¶ Returns the TimeRange as a string expressed in local time.
Returns: Timerange as a string. Return type: str
-
to_string
()¶ Returns the TimeRange as a string, useful for serialization.
Returns: String representaion of the range. Return type: str
-
to_utc_string
()¶ Returns the TimeRange as a string expressed in UTC time.
Returns: Timerange as string. Return type: str
-
class
pypond.range.
TimeRangeBase
¶ Bases:
pypond.bases.PypondBase
Base for TimeRange
-
static
awareness_check
(dtime)¶ Check input to make sure datetimes are aware. Presumes an iterable contaning datetimes, but will fail over to process a single datetime object via duck typing.
Parameters: dtime (list, tuple or pvector but will failover to datetime.) – An interable of datetime objects Raises: TimeRangeException
– Raised if a non-aware datetime object is found.
-
static
sanitize_list_input
(list_type)¶ Validate input when a pvector, list or tuple is passed in as a constructor arg.
Parameters: list_type (list, tuple of pvector) – Iterable containing args (epoch ms or datetime) that was passed to the constructor. Returns: Immutable list-like object with two elements - the beginning and ending datetime of the range. Return type: pyrsistent.pvector Raises: TimeRangeException
– Raised if bad args have been passed in.
-
static
validate_range
(range_obj)¶ Make sure that the end time is not chronologically before the begin.
Raises: TimeRangeException Parameters: range_obj (pyrsistent.pvector) – The internal begin/end immutable range object. Raises: TimeRangeException
– Raised if end arg is earlier in time than begin.
-
static
pypond.series module¶
Implements the Pond TimeSeries class.
http://software.es.net/pond/#/timeseries
-
class
pypond.series.
TimeSeries
(instance_or_wire)¶ Bases:
pypond.bases.PypondBase
A TimeSeries is a a Series where each event is an association of a timestamp and some associated data.
Data passed into it may have the following format, which is our wire format
{ "name": "traffic", "columns": ["time", "value", ...], "points": [ [1400425947000, 52, ...], [1400425948000, 18, ...], [1400425949000, 26, ...], [1400425950000, 93, ...], ... ] }
Alternatively, the TimeSeries may be constructed from a list of Event objects.
Internaly the above series is represented as two parts:
- Collection - an Immutable.List of Events and associated methods
- to query and manipulate that list
- Meta data - an Immutable.Map of extra data associated with the
- TimeSeries
The events stored in the collection may be Events (timestamp based), TimeRangeEvents (time range based) or IndexedEvents (an alternative form of a time range, such as “2014-08” or “1d-1234”)
The timerange associated with a TimeSeries is simply the bounds of the events within it (i.e. the min and max times).
Initialize a TimeSeries object from:
- Another TimeSeries/copy ctor
- An event list
- From the wire format
Parameters: instance_or_wire (TimeSeries, list of events, wire format) – See above Raises: TimeSeriesException
– Raised when args can not be properly handled.-
event_type_map
¶ dict – Map text keys from wire format to the appropriate Event class.
-
__str__
()¶ call to_string()
-
aggregate
(func, field_path=None)¶ Aggregates the events down using a user defined function to do the reduction.
Parameters: - func (function) – Function to pass to map reduce to aggregate.
- field_path (str, list, tuple, None, optional) –
Name of a single value to look up. If None, defaults to [‘value’]. “Deep” syntax either [‘deep’, ‘value’], (‘deep’, ‘value’,) or ‘deep.value.’
If field_path is None, then [‘value’] will be the default.
Returns: Dict of reduced values
Return type: dict
-
align
(field_spec=None, window='5m', method='linear', limit=None)¶ Align entry point
-
at
(i)¶ Access the series events via numeric index
Parameters: i (int) – An array index Returns: The Event object found at index i Return type: Event
-
at_first
()¶ Return first event in the series
Returns: The first event in the series. Return type: Event
-
at_time
(time)¶ Return an event in the series by its time. This is the same as calling bisect first and then using at with the index.
Parameters: time (datetime.datetime) – A datetime object Returns: The event at the designated time. Return type: Event
-
avg
(field_spec=None, filter_func=None)¶ Get avg
Parameters: - field_path (str, list, tuple, None, optional) –
Name of a single value to look up. If None, defaults to [‘value’]. “Deep” syntax either [‘deep’, ‘value’], (‘deep’, ‘value’,) or ‘deep.value.’
If field_path is None, then [‘value’] will be the default.
- filter_func (function, None) – A function (static method really) from the Filters class in module pypond.functions.Filters. It will control how bad or missing (None, NaN, empty string) values will be cleansed or filtered during aggregation. If no filter is specified, then the missing values will be retained which will potentially cause errors.
Returns: Average value
Return type: int or float
- field_path (str, list, tuple, None, optional) –
-
begin
()¶ Gets the earliest time represented in the TimeSeries.
Returns: The begin time of the underlying time range. Return type: datetime.datetime
-
begin_timestamp
()¶ Gets the earliest time represented in the TimeSeries in epoch ms.
Returns: The begin time of the underlying time range in epoch ms. Return type: int
-
bisect
(dtime, b=0)¶ Finds the index that is just less than the time t supplied. In other words every event at the returned index or less has a time before the supplied t, and every sample after the index has a time later than the supplied t.
Optionally supply a begin index to start searching from. Returns index that is the greatest but still below t.
Parameters: - dtime (datetime.datetime) – Date time object to search with
- b (int, optional) – An index position to start searching from.
Returns: The index of the Event searched for by dtime.
Return type: int
-
static
build_metadata
(meta)¶ Build the metadata out of the incoming wire format
Parameters: meta (dict) – Incoming wire format. Returns: Immutable dict of metadata Return type: pyrsistent.pmap
-
clean
(field_path=None)¶ Returns a new TimeSeries by testing the field_path values for being valid (not NaN, null or undefined). The resulting TimeSeries will be clean for that fieldSpec.
Parameters: field_path (str, list, tuple, None, optional) – Name of value to look up. If None, defaults to [‘value’]. “Deep” syntax either [‘deep’, ‘value’], (‘deep’, ‘value’,) or ‘deep.value.’
If field_path is None, then [‘value’] will be the default.
Returns: New time series from clean values from the field spec. Return type: TimeSeries
-
collapse
(field_spec_list, name, reducer, append=True)¶ Takes a fieldSpecList (list of column names) and collapses them to a new column which is the reduction of the matched columns in the fieldSpecList.
Parameters: - field_spec_list (list) – List of columns to collapse. If you need to retrieve deep nested values that [‘can.be’, ‘done.with’, ‘this.notation’].
- name (str) – Name of new column containing collapsed values.
- reducer (Function to pass to reducer.) – function
- append (bool, optional) – Append collapsed column to existing data or fresh data payload.
Returns: A new time series from the collapsed columns.
Return type:
-
collect_by_fixed_window
(window_size)¶ Summary
Parameters: window_size (str) – The window size - 1d, 6h, etc Returns: Returns the _results attribute from a Pipeline object after processing. Will contain Collection objects. Return type: list or dict
-
collection
()¶ Returns the internal collection of events for this TimeSeries
Returns: Internal collection. Return type: Collection
-
columns
()¶ create a list of the underlying columns.
Due to the nature of the event data and using dicts, the order of the column list might be somewhat unpredictable. When generating points, this is solved by passing the column list to .to_point() as an optional argument to ensure that the points and the columns are properly aligned.
Returns: List of column names. Return type: list
-
count
()¶ alias for size.
Returns: Number of rows in series. Return type: int
-
crop
(timerange)¶ Crop the TimeSeries to the specified TimeRange and return a new TimeSeries
Parameters: timerange (TimeRange) – Bounds of the new TimeSeries Returns: The new cropped TimeSeries instance. Return type: TimeSeries
-
daily_rollup
(aggregation, to_events=False, utc=False)¶ Builds a new TimeSeries by dividing events into days. The days are in either local or UTC time, depending on if utc(true) is set on the Pipeline.
Each window then has an aggregation specification applied as aggregation. This specification describes a mapping of output columns to fieldNames to aggregation functions. For example:
{ 'in_avg': {'in': Functions.avg()}, 'out_avg': {'out': Functions.avg()}, 'in_max': {'in': Functions.max()}, 'out_max': {'out': Functions.max()}, }
will aggregate both the “in” and “out” columns, using the avg aggregation function will perform avg and max aggregations on the in and out columns, across all events within each day, and the results will be put into the 4 new columns in_avg, out_avg, in_max and out_max.
Example:
timeseries = TimeSeries(data) hourly_max_temp = timeseries.daily_rollup( {'max_temp': {'temperature': Functions.max()}} )
This helper function defaults to rendering the aggregations in localtime. The reason for this is that rendering in localtime makes the most sense on the client side - like rendering a timeseries chart. A user looking at a chart in UTC might not make much sense.
Since this is now being used in servers side applications, the optional arg utc can be set to True to force it to render in UTC instead.
Probably best to favor using .fixed_window_rollup() when wanting to render in UTC.
Parameters: - aggregation (dict) – The aggregation specification e.g. {‘max_temp’: {‘temperature’: Functions.max()}}
- to_event (bool, optional) – Do conversion to Event objects
Returns: The resulting rolled up TimeSeries.
Return type:
-
end
()¶ Gets the latest time represented in the TimeSeries.
Returns: The end time of the underlying time range. Return type: datetime.datetime
-
end_timestamp
()¶ Gets the latest time represented in the TimeSeries in epoch ms.
Returns: The end time of the underlying time range in epoch ms. Return type: int
-
static
equal
(series1, series2)¶ Check equality - same instance.
Parameters: - series1 (TimeSeries) – A time series
- series2 (TimeSeries) – Another time series
Returns: Are the two the same instance?
Return type: bool
-
event_type_map
= {'index': <class 'pypond.indexed_event.IndexedEvent'>, 'time': <class 'pypond.event.Event'>, 'timerange': <class 'pypond.timerange_event.TimeRangeEvent'>}
-
events
()¶ Generator to allow for..of loops over series.events()
Returns: Generator for loops. Return type: iterator
-
fill
(field_spec=None, method='zero', fill_limit=None)¶ Take the data in this timeseries and “fill” any missing or invalid values. This could be setting None values to zero so mathematical operations will succeed, interpolate a new value, or pad with the previously given value.
Parameters: - field_spec (str, list, tuple, None, optional) –
Column or columns to look up. If you need to retrieve multiple deep nested values that [‘can.be’, ‘done.with’, ‘this.notation’]. A single deep value with a string.like.this.
If None, the default column field ‘value’ will be used.
- method (str, optional) – Filling method: zero | linear | pad
- fill_limit (None, optional) – Set a limit on the number of consecutive events will be filled before it starts returning invalid values. For linear fill, no filling will happen if the limit is reached before a valid value is found.
Returns: A clone of this TimeSeries with a new Collection generated by the fill operation.
Return type: - field_spec (str, list, tuple, None, optional) –
-
fixed_window_rollup
(window_size, aggregation, to_events=False)¶ Builds a new TimeSeries by dividing events within the TimeSeries across multiple fixed windows of size windowSize.
Note that these are windows defined relative to Jan 1st, 1970, and are UTC, so this is best suited to smaller window sizes (hourly, 5m, 30s, 1s etc), or in situations where you don’t care about the specific window, just that the data is smaller.
Each window then has an aggregation specification applied as aggregation. This specification describes a mapping of output columns to fieldNames to aggregation functions. For example:
{ 'in_avg': {'in': Functions.avg()}, 'out_avg': {'out': Functions.avg()}, 'in_max': {'in': Functions.max()}, 'out_max': {'out': Functions.max()}, }
will aggregate both the “in” and “out” columns, using the avg aggregation function will perform avg and max aggregations on the in and out columns, across all events within each hour, and the results will be put into the 4 new columns in_avg, out_avg, in_max and out_max.
Example:
timeseries = TimeSeries(data) daily_avg = timeseries.fixed_window_rollup('1d', {'value_avg': {'value': Functions.avg()}} )
Parameters: - window_size (str) – The size of the window, e.g. ‘6h’ or ‘5m’
- aggregation (Options) – The aggregation specification
- to_events (bool, optional) – Convert to events
Returns: The resulting rolled up TimeSeries
Return type:
-
hourly_rollup
(aggregation, to_events=False)¶ Builds a new TimeSeries by dividing events into hours. The hours are in either local or UTC time, depending on if utc(true) is set on the Pipeline.
Each window then has an aggregation specification applied as aggregation. This specification describes a mapping of output columns to fieldNames to aggregation functions. For example:
{ 'in_avg': {'in': Functions.avg()}, 'out_avg': {'out': Functions.avg()}, 'in_max': {'in': Functions.max()}, 'out_max': {'out': Functions.max()}, }
will aggregate both the “in” and “out” columns, using the avg aggregation function will perform avg and max aggregations on the in and out columns, across all events within each hour, and the results will be put into the 4 new columns in_avg, out_avg, in_max and out_max.
Example:
timeseries = TimeSeries(data) hourly_max_temp = timeseries.hourly_rollup( {'max_temp': {'temperature': Functions.max()}} )
Parameters: - aggregation (dict) – The aggregation specification e.g. {‘max_temp’: {‘temperature’: Functions.max()}}
- to_event (bool, optional) – Do conversion to Event objects
Returns: The resulting rolled up TimeSeries.
Return type:
-
index_as_range
()¶ Index returned as time range.
Returns: Index as a TimeRange or None Return type: TimeRange
-
index_as_string
()¶ Index represented as a string.
Returns: String format of Index or None. Return type: str
-
is_utc
()¶ Get data utc.
-
map
(op)¶ - Takes an operator that is used to remap events from this TimeSeries to
- new set of Events. The result is returned via the callback.
Parameters: op (function) – An operator which will be passed each event and which should return a new event. Returns: A clone of this TimeSeries with a new Collection generated by the map operation. Return type: TimeSeries
-
max
(field_path=None, filter_func=None)¶ Get max
Parameters: - field_path (str, list, tuple, None, optional) –
Name of a single value to look up. If None, defaults to [‘value’]. “Deep” syntax either [‘deep’, ‘value’], (‘deep’, ‘value’,) or ‘deep.value.’
If field_path is None, then [‘value’] will be the default.
- filter_func (function, None) – A function (static method really) from the Filters class in module pypond.functions.Filters. It will control how bad or missing (None, NaN, empty string) values will be cleansed or filtered during aggregation. If no filter is specified, then the missing values will be retained which will potentially cause errors.
Returns: Max value
Return type: int or float
- field_path (str, list, tuple, None, optional) –
-
mean
(field_path=None, filter_func=None)¶ Get mean
Parameters: - field_path (str, list, tuple, None, optional) –
Name of a single value to look up. If None, defaults to [‘value’]. “Deep” syntax either [‘deep’, ‘value’], (‘deep’, ‘value’,) or ‘deep.value.’
If field_path is None, then [‘value’] will be the default.
- filter_func (function, None) – A function (static method really) from the Filters class in module pypond.functions.Filters. It will control how bad or missing (None, NaN, empty string) values will be cleansed or filtered during aggregation. If no filter is specified, then the missing values will be retained which will potentially cause errors.
Returns: Mean value
Return type: int or float
- field_path (str, list, tuple, None, optional) –
-
median
(field_path=None, filter_func=None)¶ Get median
Parameters: - field_path (str, list, tuple, None, optional) –
Name of a single value to look up. If None, defaults to [‘value’]. “Deep” syntax either [‘deep’, ‘value’], (‘deep’, ‘value’,) or ‘deep.value.’
If field_path is None, then [‘value’] will be the default.
- filter_func (function, None) – A function (static method really) from the Filters class in module pypond.functions.Filters. It will control how bad or missing (None, NaN, empty string) values will be cleansed or filtered during aggregation. If no filter is specified, then the missing values will be retained which will potentially cause errors.
Returns: Median value
Return type: int or float
- field_path (str, list, tuple, None, optional) –
-
meta
(key=None)¶ Returns the meta data about this TimeSeries as a JSON object
Parameters: key (str, optional) – Optional metadata key to fetch value for Returns: Return a thawed metadata dict or the value specified by key. Return type: dict or key/value
-
min
(field_path=None, filter_func=None)¶ Get min
Parameters: - field_path (str, list, tuple, None, optional) –
Name of a single value to look up. If None, defaults to [‘value’]. “Deep” syntax either [‘deep’, ‘value’], (‘deep’, ‘value’,) or ‘deep.value.’
If field_path is None, then [‘value’] will be the default.
- filter_func (function, None) – A function (static method really) from the Filters class in module pypond.functions.Filters. It will control how bad or missing (None, NaN, empty string) values will be cleansed or filtered during aggregation. If no filter is specified, then the missing values will be retained which will potentially cause errors.
Returns: Min value
Return type: int or float
- field_path (str, list, tuple, None, optional) –
-
monthly_rollup
(aggregation, to_events=False, utc=False)¶ Builds a new TimeSeries by dividing events into months. The months are in either local or UTC time, depending on if utc(true) is set on the Pipeline.
Each window then has an aggregation specification applied as aggregation. This specification describes a mapping of output columns to fieldNames to aggregation functions. For example:
{ 'in_avg': {'in': Functions.avg()}, 'out_avg': {'out': Functions.avg()}, 'in_max': {'in': Functions.max()}, 'out_max': {'out': Functions.max()}, }
will aggregate both the “in” and “out” columns, using the avg aggregation function will perform avg and max aggregations on the in and out columns, across all events within each month, and the results will be put into the 4 new columns in_avg, out_avg, in_max and out_max.
Example:
timeseries = TimeSeries(data) hourly_max_temp = timeseries.monthly_rollup( {'max_temp': {'temperature': Functions.max()}} )
This helper function defaults to rendering the aggregations in localtime. The reason for this is that rendering in localtime makes the most sense on the client side - like rendering a timeseries chart. A user looking at a chart in UTC might not make much sense.
Since this is now being used in servers side applications, the optional arg utc can be set to True to force it to render in UTC instead.
Probably best to favor using .fixed_window_rollup() when wanting to render in UTC.
Parameters: - aggregation (dict) – The aggregation specification e.g. {‘max_temp’: {‘temperature’: Functions.max()}}
- to_event (bool, optional) – Do conversion to Event objects
Returns: The resulting rolled up TimeSeries.
Return type:
-
name
()¶ Get data name.
Returns: Data name. Return type: str
-
percentile
(perc, field_path, method='linear', filter_func=None)¶ Gets percentile perc within the Collection. Numpy under the hood.
Parameters: - perc (int) – The percentile (should be between 0 and 100)
- field_path (str, list, tuple, None, optional) –
Name of a single value to look up. If None, defaults to [‘value’]. “Deep” syntax either [‘deep’, ‘value’], (‘deep’, ‘value’,) or ‘deep.value.’
If field_path is None, then [‘value’] will be the default.
- method (str, optional) –
Specifies the interpolation method to use when the desired percentile lies between two data points. Options are:
linear: i + (j - i) * fraction, where fraction is the fractional part of the index surrounded by i and j.
lower: i
higher: j
nearest: i or j whichever is nearest
midpoint: (i + j) / 2
Returns: The percentile.
Return type: int or float
-
pipeline
()¶ Returns a new Pipeline with input source being initialized to this TimeSeries collection. This allows pipeline operations to be chained directly onto the TimeSeries to produce a new TimeSeries or Event result.
Returns: New pipline. Return type: Pipeline
-
quantile
(num, field_path=None, method='linear')¶ Gets num quantiles within the Collection
Parameters: - num (Number of quantiles to divide the Collection into.) – Description
- field_path (None, optional) – The field to return as the quantile. If not set, defaults to ‘value.’
- method (str, optional) –
Specifies the interpolation method to use when the desired percentile lies between two data points. Options are:
linear: i + (j - i) * fraction, where fraction is the fractional part of the index surrounded by i and j.
lower: i
higher: j
nearest: i or j whichever is nearest
midpoint: (i + j) / 2
Returns: An array of quantiles
Return type: list
-
range
()¶ Alias for timerange()
Returns: TimeRange internal of the underly collection. Return type: TimeRange
-
rate
(field_spec=None, allow_negative=True)¶ derive entry point
-
rename_columns
(rename_map)¶ TimeSeries.map() helper function to rename columns in the underlying events.
Takes a dict of columns to rename:
new_ts = ts.rename_columns({'in': 'new_in', 'out': 'new_out'})
Returns a new time series containing new events. Columns not in the dict will be retained and not renamed.
NOTE: as the name implies, this will only rename the main “top level” (ie: non-deep) columns. If you need more extravagant renaming, roll your own using map().
Parameters: rename_map (dict) – Dict of columns to rename. Returns: A clone of this TimeSeries with a new Collection generated by the map operation. Return type: TimeSeries
-
static
same
(series1, series2)¶ Implements JS Object.is() - same values
Parameters: - series1 (TimeSeries) – A time series
- series2 (TimeSeries) – Another time series
Returns: Do the two have the same values?
Return type: bool
-
select
(field_spec=None)¶ call select on the pipeline.
Parameters: field_spec (str, list, tuple, None, optional) – Column or columns to look up. If you need to retrieve multiple deep nested values that [‘can.be’, ‘done.with’, ‘this.notation’]. A single deep value with a string.like.this.
If None, the default ‘value’ column will be used.
Returns: A clone of this TimeSeries with a new Collection generated by the select operation. Return type: TimeSeries
-
set_collection
(coll)¶ Sets a new underlying collection for this TimeSeries.
Parameters: coll (Collection) – New collection to assign to this TimeSeries Returns: New TimeSeries with Collection coll Return type: TimeSeries
-
set_meta
(key, value)¶ Change the metadata of the TimeSeries
Parameters: - key (str) – The metadata key
- value (obj) – The value
Returns: A new TimeSeries with new metadata.
Return type:
-
set_name
(name)¶ Set name and generate a new TimeSeries
Parameters: name (str) – New name Returns: Return a TimeSeries with a new name. Return type: TimeSeries
-
size
()¶ Number of rows in series.
Returns: Number in the series. Return type: int
-
size_valid
(field_path)¶ Returns the number of valid items in this collection.
Uses the fieldSpec to look up values in all events. It then counts the number that are considered valid, i.e. are not NaN, undefined or null.
Parameters: field_path (str, list, tuple, None, optional) – Name of value to look up. If None, defaults to [‘value’]. “Deep” syntax either [‘deep’, ‘value’], (‘deep’, ‘value’,) or ‘deep.value.’
If field_path is None, then [‘value’] will be the default.
Returns: Number of valid <field_path> values in the events. Return type: int
-
slice
(begin, end)¶ Perform a slice of events within the TimeSeries, returns a new TimeSeries representing a portion of this TimeSeries from begin up to but not including end. Uses typical python [slice:syntax].
Parameters: - begin (int) – Slice begin index
- end (int) – Slice end index
Returns: New instance with sliced collection.
Return type:
-
stdev
(field_path=None, filter_func=None)¶ Get std dev
Parameters: - field_path (str, list, tuple, None, optional) –
Name of a single value to look up. If None, defaults to [‘value’]. “Deep” syntax either [‘deep’, ‘value’], (‘deep’, ‘value’,) or ‘deep.value.’
If field_path is None, then [‘value’] will be the default.
- filter_func (function, None) – A function (static method really) from the Filters class in module pypond.functions.Filters. It will control how bad or missing (None, NaN, empty string) values will be cleansed or filtered during aggregation. If no filter is specified, then the missing values will be retained which will potentially cause errors.
Returns: Standard deviation
Return type: int or float
- field_path (str, list, tuple, None, optional) –
-
sum
(field_path=None, filter_func=None)¶ Get sum
Parameters: - field_path (str, list, tuple, None, optional) –
Name of a single value to look up. If None, defaults to [‘value’]. “Deep” syntax either [‘deep’, ‘value’], (‘deep’, ‘value’,) or ‘deep.value.’
If field_path is None, then [‘value’] will be the default.
- filter_func (function, None) – A function (static method really) from the Filters class in module pypond.functions.Filters. It will control how bad or missing (None, NaN, empty string) values will be cleansed or filtered during aggregation. If no filter is specified, then the missing values will be retained which will potentially cause errors.
Returns: Summed values
Return type: int or float
- field_path (str, list, tuple, None, optional) –
-
timerange
()¶ Returns the extents of the TimeSeries as a TimeRange..
Returns: TimeRange internal of the underly collection. Return type: TimeRange
-
static
timeseries_list_merge
(data, series_list)¶ Merge a list of time series.
Parameters: - data (dict or pvector) – Metadata to set in new TimeSeries.
- series_list (list) – List of TimeSeries instances.
Returns: New TimeSeries from merge.
Return type:
-
static
timeseries_list_reduce
(data, series_list, reducer, field_spec=None)¶ Reduces a list of TimeSeries objects using a reducer function. This works by taking each event in each TimeSeries and collecting them together based on timestamp. All events for a given time are then merged together using the reducer function to produce a new Event. Those Events are then collected together to form a new TimeSeries.
Parameters: - data (dict or pmap) – Metadata to set in new TimeSeries.
- series_list (list) – List of TimeSeries objects.
- reducer (function) – reducer function
- field_spec (list, str, None, optional) –
Column or columns to look up. If you need to retrieve multiple deep nested values that [‘can.be’, ‘done.with’, ‘this.notation’]. A single deep value with a string.like.this.
Can be set to None if the reducer does not require a field spec.
Returns: New time series containing the mapped events.
Return type:
-
static
timeseries_list_sum
(data, series_list, field_spec)¶ Takes a list of TimeSeries and sums them together to form a new Timeseries.
const ts1 = new TimeSeries(weather1) const ts2 = new TimeSeries(weather2) const sum = TimeSeries.timeseries_list_sum({name: “sum”}, [ts1, ts2], [“temp”])
Parameters: - data (dict) – Metadata to set in new TimeSeries.
- series_list (list) – List of TimeSeries objects
- field_spec (list, str, None, optional) – Column or columns to look up. If you need to retrieve multiple deep nested values that [‘can.be’, ‘done.with’, ‘this.notation’]. A single deep value with a string.like.this. If None, all columns will be operated on.
Returns: New time series with summed values.
Return type:
-
to_json
()¶ Returns the TimeSeries as a python dict.
This is actually like json.loads(s) - produces the actual vanilla data structure.
Returns: Dictionary of columns and points Return type: dict
-
to_string
()¶ Retruns the TimeSeries as a string, useful for serialization.
In JS land, this is synonymous with __str__ or __unicode__
Returns: String version of to_json() for transmission/etc. Return type: str
-
yearly_rollup
(aggregation, to_events=False, utc=False)¶ Builds a new TimeSeries by dividing events into years. The years are in either local or UTC time, depending on if utc(true) is set on the Pipeline.
Each window then has an aggregation specification applied as aggregation. This specification describes a mapping of output columns to fieldNames to aggregation functions. For example:
{ 'in_avg': {'in': Functions.avg()}, 'out_avg': {'out': Functions.avg()}, 'in_max': {'in': Functions.max()}, 'out_max': {'out': Functions.max()}, }
will aggregate both the “in” and “out” columns, using the avg aggregation function will perform avg and max aggregations on the in and out columns, across all events within each year, and the results will be put into the 4 new columns in_avg, out_avg, in_max and out_max.
Example:
timeseries = TimeSeries(data) hourly_max_temp = timeseries.monthly_rollup( {'max_temp': {'temperature': Functions.max()}} )
This helper function defaults to rendering the aggregations in localtime. The reason for this is that rendering in localtime makes the most sense on the client side - like rendering a timeseries chart. A user looking at a chart in UTC might not make much sense.
Since this is now being used in servers side applications, the optional arg utc can be set to True to force it to render in UTC instead.
Probably best to favor using .fixed_window_rollup() when wanting to render in UTC.
Parameters: - aggregation (dict) – The aggregation specification e.g. {‘max_temp’: {‘temperature’: Functions.max()}}
- to_event (bool, optional) – Do conversion to Event objects
Returns: The resulting rolled up TimeSeries.
Return type:
pypond.timerange_event module¶
TimeRangeEvent associates data with a specific time range rather than at a discret time like Event does.
-
class
pypond.timerange_event.
TimeRangeEvent
(instance_or_args, arg2=None)¶ Bases:
pypond.event.EventBase
The creation of an TimeRangeEvent is done by combining two parts - the timerange and the data.
To construct you specify a TimeRange, along with the data.
The first arg can be:
- a TimeRangeEvent instance (copy ctor)
- a pyrsistent.PMap, or
- a python tuple, list or pyrsistent.PVector object containing two python datetime objects or ms timestamps - the args for the TimeRange object.
To specify the data you can supply either:
- a python dict
- a pyrsistent.PMap, or
- a simple type such as an integer. In the case of the simple type this is a shorthand for supplying {“value”: v}.
Parameters: - instance_or_args (TimeRange, iterable, pyrsistent.pmap) – See above
- arg2 (dict, pmap, int, float, str, optional) – See above.
-
begin
()¶ The begin time of this Event, which will be just the timestamp.
Returns: Beginning of range. Return type: datetime.datetime
-
end
()¶ The end time of this Event, which will be just the timestamp.
Returns: End of range. Return type: datetime.datetime
-
humanize_duration
()¶ Humanize the timerange.
Returns: Humanized string of the time range. Return type: str
-
key
()¶ Returns a range string in the format ‘begin,end’ as expressed as ms since the epoch.
Returns: The begin and end of the timerange in ms since the epoch. Return type: str
-
set_data
(data)¶ Sets the data portion of the event and returns a new TimeRangeEvent.
Parameters: - data (dict) – The new data portion for this event object.
- data – New payload to set as the data for this event.
Returns: TimeRangeEvent - a new TimeRangeEvent object.
Returns: A new time range event object with new data payload.
Return type:
-
timerange
()¶ The TimeRange of this data.
Returns: The underlying time range object. Return type: TimeRange
-
timerange_as_local_string
()¶ The timerange of this data, in Local time, as a string.
Returns: Formatted time string. Return type: str
-
timerange_as_utc_string
()¶ The timerange of this data, in UTC time, as a string.
Returns: Formatted time string Return type: str
-
timestamp
()¶ The timestamp of this Event data. It’s just the beginning of the range in this case.
Returns: Beginning of range. Return type: datetime.datetime
-
to_json
()¶ - Returns the TimeRangeEvent as a JSON object, essentially
{timerange: tr, data: {key: value, ...}}
This is actually like json.loads(s) - produces the actual data structure from the object internal data.
Returns: Dict representation of internals (timerange, data). Return type: dict
-
to_point
(cols=None)¶ Returns a flat array starting with the timestamp, followed by the values.
Can be given an optional list of columns so the returned list will have the values in order. Primarily for the TimeSeries wire format.
Parameters: cols (list, optional) – List of data columns to order the data points in so the TimeSeries wire format lines up correctly. If not specified, the points will be whatever order that dict.values() decides to return it in. Returns: Epoch ms followed by points. Return type: list
-
type
()¶ Return the type of this event type
Returns: The class of this event type. Return type: class
pypond.util module¶
Various utilities for the pypond code. Primarily functions to take care of consistent handling and conversion of time values as we are trying to traffic in aware datetime objects in UTC time.
Additionally some boolean test functions and assorted other utility functions.
-
class
pypond.util.
Capsule
(**kwargs)¶ Bases:
pypond.util.Options
Straight subclass of Options so there is no confusion between this and the pipeline Options. Employing this to mimic the Javascript Object in cases where using a Python dict would cause confusion porting the code.
-
class
pypond.util.
ObjectEncoder
(skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False, indent=None, separators=None, encoding='utf-8', default=None)¶ Bases:
json.encoder.JSONEncoder
Class to allow arbitrary python objects to be json encoded with json.dumps()/etc by defining a .to_json() method on your object.
We need this for encoding lists of custom Event (etc) objects.
Usage: json.dumps(your_cool_object, cls=ObjectEncoder)
-
default
(obj)¶ Implement this method in a subclass such that it returns a serializable object for
o
, or calls the base implementation (to raise aTypeError
).For example, to support arbitrary iterators, you could implement default like this:
def default(self, o): try: iterable = iter(o) except TypeError: pass else: return list(iterable) # Let the base class default method raise the TypeError return JSONEncoder.default(self, o)
-
-
class
pypond.util.
Options
(**kwargs)¶ Bases:
object
Encapsulation object for Pipeline options.
Example:
o = Options(foo='bar') and o = Options() o.foo = 'bar' Are identical.
Parameters: initial (dict, optional) – Can supply keyword args for initial values. -
to_dict
()¶
-
-
pypond.util.
aware_dt_from_args
(dtargs, localize=False)¶ generate an aware datetime object using datetime.datetime kwargs.
can generate a localized version as well, but please don’t.
Parameters: - dtargs (dict) – Dict containing the args you pass to datetime.datetime.
- localize (bool, optional) – Will create a new object in localtime, but just don’t do it.
Returns: New datetime object
Return type: datetime.datetime
Raises: UtilityException
– Raised if the args are wrong type.
-
pypond.util.
aware_utcnow
()¶ return an aware utcnow() datetime rounded to milliseconds.
Returns: New datetime object Return type: datetime.datetime
-
pypond.util.
dt_from_ms
(msec)¶ generate a datetime object from epoch milliseconds
Parameters: msec (int) – epoch milliseconds Returns: New datetime object from ms Return type: datetime.datetime
-
pypond.util.
dt_is_aware
(dtime)¶ see if a datetime object is aware
Parameters: dtime (datetime.datetime) – A datetime object Returns: Returns True if the dtime is aware/non-naive. Return type: bool
-
pypond.util.
format_dt
(dtime, localize=False)¶ Format for human readable output.
Parameters: - dtime (datetime.datetime) – A datetime object
- localize (bool, optional) – Display as local time.
Returns: Formatted date string.
Return type: str
-
pypond.util.
generate_paths
(dic)¶ Generate a list of all possible field paths in a dict. This is for determining all paths in a dict when none is given.
Currently unused, but keeping since we will probably need it.
Parameters: dic (dict) – A dict, generally the payload from an Event class. Returns: A list of strings of all the paths in the dict. Return type: list
-
pypond.util.
humanize_dt
(dtime)¶ format time format display for humanize maneuvers.
Parameters: dtime (datetime.datetime) – A datetime object Returns: Datetime formatted as a string. Return type: str
-
pypond.util.
humanize_dt_ago
(dtime)¶ format to “23 minutes ago” style format.
Parameters: dtime (datetime.datetime) – A datetime object Returns: Humanized string. Return type: str
-
pypond.util.
humanize_duration
(delta)¶ format for a single duration value - takes datatime.timedelta as arg
Parameters: delta (datetime.timedelta) – A time delta Returns: Humanize delta to duration. Return type: str
-
pypond.util.
is_function
(func)¶ Test if a value is a function.
Parameters: func (obj) – A value Returns: Is the object a python function? Return type: bool
-
pypond.util.
is_nan
(val)¶ Test if a value is NaN
Parameters: val (obj) – A value Returns: Is it NaN? Return type: bool
-
pypond.util.
is_pipeline
(obj)¶ Test if something is a Pipeline object. This is put here with a deferred import statement to avoid circular imports so the I/O don’t need to import pipeline.py.
This probably does not need to be deferred but doing it for safety sake.
Parameters: obj (object) – An object to test to see if it’s a Pipeline. Returns: True if Pipeline Return type: bool
-
pypond.util.
is_pmap
(pmap)¶ Check this here so people don’t mistake pmap and PMap.
Parameters: pmap (obj) – An object Returns: Returns True if it is a pyrsistent.pmap Return type: bool
-
pypond.util.
is_pvector
(pvector)¶ Check this here so people don’t mistake PVector and pvector.
Parameters: pvector (obj) – An object Returns: Returns True if it is a pyrsistent.pvector Return type: bool
-
pypond.util.
is_valid
(val)¶ Test if a value is valid.
Parameters: val (obj) – A value Returns: Is it valid? Return type: bool
-
pypond.util.
localtime_from_ms
(msec)¶ generate an aware localtime datetime object from ms
Parameters: msec (int) – epoch milliseconds Returns: New datetime object Return type: datetime.datetime
-
pypond.util.
localtime_info_from_utc
(dtime)¶ Extract local TZ formatted values from an aware UTC datetime object. This is used by the index string methods when grouping data for local display.
Parameters: dtime (datetime.datetime) – An aware UTC datetime object Returns: A dict with formatted elements (zero-padded months, etc) extracted from the local version. Return type: dict
-
pypond.util.
monthdelta
(date, delta)¶ because we wish datetime.timedelta had a month kwarg.
Courtesy of: http://stackoverflow.com/a/3425124/3916180
Parameters: - date (datetime.date) – Date object
- delta (int) – Month delta
Returns: New Date object with delta offset.
Return type: datetime.date
-
pypond.util.
ms_from_dt
(dtime)¶ Turn a datetime object into ms since epoch.
Parameters: dtime (datetime.datetime) – A datetime object Returns: epoch milliseconds Return type: int
-
pypond.util.
nested_get
(dic, keys)¶ Address a nested dict with a list of keys to fetch a value. This is functionaly similar to the standard functools.reduce() method employing dict.get, but this returns ‘bad_path’ if the path does not exist. This is because we need to differentiate between an existing value that is actually None vs. the dict.get() failover. Would have preferred to return False, but who knows if we’ll end up with data containing Boolean values.
sample_dict = dict() nested_set(sample_dict, ['bar', 'baz'], 23) nested_get(sample_dict, ['bar', 'quux']) False
Unlike nested_set(), this will not create a new path branch if it does not already exist.
Parameters: - dic (dict) – The dict we are working with
- keys (list) – A lsit of nested keys
Returns: Whatever value was at the terminus of the keys.
Return type: obj
-
pypond.util.
nested_set
(dic, keys, value)¶ Address a nested dict with a list of keys and set a value. If part of the path does not exist, it will be created.
sample_dict = dict() nested_set(sample_dict, ['bar', 'baz'], 23) {'bar': {'baz': 23}} nested_set(sample_dict, ['bar', 'baz'], 25) {'bar': {'baz': 25}}
Parameters: - dic (dict) – The dict we are workign with.
- keys (list) – A list of nested keys
- value (obj) – Whatever we want to set the ultimate key to.
-
pypond.util.
sanitize_dt
(dtime, testing=False)¶ Make sure the datetime object is in UTC/etc. Also round incoming datetime objects to milliseconds.
Allow disabling warnings when testing. Warning primarily exists to herd users into not passing in non-UTC tz datetime objects.
Parameters: - dtime (datetime.datetime) – A datetime object
- testing (bool, optional) – Suppress warnings when testing.
Returns: New datetime object rounded to ms from microseconds.
Return type: datetime.datetime
-
pypond.util.
to_milliseconds
(dtime)¶ Check to see if a datetime object has granularity smaller than millisecond (ie: microseconds) and massage back to ms if so.
Doing this round-trip seems kludgy and inefficient, but doing this:
return dtime.replace(millisecond=round(dt.millisecond, -3))
produced inconsistent results because of the rounding and I’m not going to start treating numbers like strings.
Parameters: dtime (datetime.datetime) – A datetime object. Returns: New datetime object rounded down to milliseconds from microseconds. Return type: datetime.datetime
-
pypond.util.
unique_id
(prefix='')¶ generate a uuid with a prefix - for debugging. This probably isn’t truly random but it’s random enough. Calling uuid.uuid4() was imposing non-trivial drag on performance. The calls to /dev/urandom can block on certain unix-like systems.
Parameters: prefix (str, optional) – Prefix for uuid. Returns: Prefixed uuid. Return type: str