Online learning with structured streaming, spark summit brussels 2016

  • Published on

  • View

  • Download


Recipes for running Spark Streaming in ProductionOnline Learning with Structured StreamingRam Sriharsha, Vlad Feinberg @halfabraneSpark Summit, Brussels27 October 20161What is online learning?Update model parameters on each data pointIn batch setting get to see the entire dataset before updateCannot visit data points againIn batch setting, can iterate over data points as many times as we want!2An example: the perceptron3xwUpdate Rule: if (y != sign(w.x)), w -> w + y(w.x)Goal: Find the best line separating positiveFrom negative examples on a planeWhy learn online?I want to adapt to changing patterns quicklydata distribution can changee.g, distribution of features that affect learning might change over timeI need to learn a good model within resource + time constraints (large-scale learning)Time to a given accuracy might be faster for certain online algorithms4Online Classification SettingPick a hypothesisFor each labeled example (, y):Predict label using hypothesisObserve the loss (y, ) (and its gradient)Learn from mistake and update hypothesisGoal: to make as few mistakes as possible in comparison to the best hypothesis in hindsight5An example: Online SGDInitialize weights Loss function is known.For each labeled example (, y):Perform update -> (y , .)For each new example x:Predict = (.) ( is called link function)6(y , .)Distributed Online LearningSynchronousOn each worker:Load training data, compute gradients and update model, push model to driverOn some node:Perform model mergeAsynchronousOn each worker:Load training data, compute gradients and push to serverOn each server:Aggregate the gradients, perform update step7Vlad Feinberg () - We are using the synchronous model, but the driver doesn't do the merge - it's some worker.ChallengesNot all algorithms admit efficient online versionsLack of infrastructure(Single machine) Vowpal Wabbit works great but hard to use from Scala, Java and other languages.(Distributed) No implementation that is fault tolerant, scalable, robustLack of framework in open source to provide extensible algorithmsAdagrad, normalized learning, L1 regularization,Online SGD, FTRL, ...8Structured StreamingOne single API DataFrame for everythingSame API for machine learning, batch processing, graphXDataset is a typed version of DataFrame for Scala and JavaEnd-to-end exactly-once guaranteesThe guarantees extend into the sources/sinks, e.g. MySQL, S3Understands external event-timeHandling late arriving dataSupport sessionization based on event-timeStructured StreamingHow does it work?at any time, the output of the application is equivalent to executing a batch job on a prefix of the data11The ModelTrigger: every 1 sec123Timedata upto 1Inputdata upto 2data upto 3QueryInput: data from source as an append-only tableTrigger: how frequently to check input for new dataQuery: operations on input usual map/filter/reduce new window, session opsThe ModelTrigger: every 1 sec123output for data up to 1ResultQueryTimedata upto 1Inputdata upto 2output for data up to 2data upto 3output for data up to 3Result: final operated table updated every trigger intervalOutput: what part of result to write to data sink after every trigger Complete output: Write full result table every timeOutputcomplete outputThe ModelTrigger: every 1 sec123output for data up to 1ResultQueryTimedata upto 1Inputdata upto 2output for data up to 2data upto 3output for data up to 3OutputdeltaoutputResult: final operated table updated every trigger intervalOutput: what part of result to write to data sink after every trigger Complete output: Write full result table every timeDelta output: Write only the rows that changed in result from previous batchAppend output: Write only new rows*Not all output modes are feasible with all queriesStreaming ML on Structured Streaming Streaming ML on Structured StreamingTrigger: every 1 sec123Timedata upto 1Inputdata upto 2data upto 3QueryInput: append only table containing labeled examplesQuery: Stateful aggregation query: picks up the last trained model, performs a distributed update + mergeAs before, the input can be viewed as an ever-expanding input table, which represents the incoming stream of labeled examples.Just like most machine learning algorithms can be thought of as iterative reductions over ones entire dataset, the streamingML query is also just a reduce operation.Streaming ML has key differences in semantics from both, first, traditional ML algorithms and, second, regular aggregations. First, whereas a typical ML algorithm, like LogisticRegression, will perform multiple passess over the dataset as it improves the model, streaming ML will, in effect, do a single pass over this infinite table.Streaming ML on Structured StreamingTrigger: every 1 sec123model for data up to tResultQueryTimelabeled examples upto time tInputResult: table of model parameters updated every trigger intervalComplete mode: table has one row,constantly being updatedAppend mode (in the works): table has timestamp-keyed model, onerow per trigger Outputintermediate models would have the same state at this point of computation for the (abstract) queries #1 and #2Second, where typical aggregations are stateless, streaming ML is stateful.For instance, when you calculate a sum of numbers, you dont need any state to compute partial sums of separate data on your machines - you can just start adding from 0.OTOH, if you have an online algorithm, baked into it is some kind of model state, that you learn on as you see more and more examples. When we get a new batch of labelled data, we cant start from 0 - we have to keep using the most recent model as we go along.What this query would spit out is the resulting aggregation, the model trained on all of the tables data. In the streaming context, this means the model trained on all of the data seen so far.Why is this hard?Need to update model, i.eUpdate(previousModel, newDataPoint) = newModelTypical aggregation is associative, commutativee.g. sum(P1: sum(sum(0, data[0]), data[1]),P2: sum(sum(0, data[2]), data[3]))General model update violates associativity + commutativity!18Solution: Make AssumptionsResult may be partition-dependent, but we dont care as long as we get some valid result.average-models( P1: update(update(previous model, data[0]), data[1]), P2: update(update(previous model, data[2]), data[3]))Only partition-dependent if update and average dont commute - can still be deterministic otherwise!19Stateful AggregatorWithin each partitionInitialize with previous state (instead of zero in regular aggregator)For each item, update statePerform reduce stepOutput final stateVery general abstraction: works for sketches, online statistics (quantiles), online clustering 20How does it work?DriverMapMapState StoreLabeled Stream SourceReduceIs there more data?yes!run queryMapRead labeled examplesFeature transforms, gradient updatesModel averagingsave modelread last saved modelAPIsSpark Summit Brussels27 October 201622ML Estimator on StreamsInteroperable with ML pipelines23Streaming DFm = sinkInput: stream of labelled dataOutput: stream of models, updated over time.Interoperability: You can use this just along with or inside your queries - ANY input streaming dataframeBatch integration. You get out a STREAM of models (to S3).23Batch InteroperabilitySeamless application on batch datasets24Static DF for batch MLmodel = fit() on a batch dataset runs a single pass of the algorithm. Great for speed on large datasets, analyzing online model behavior in a controlled environment.24Feature CreationHandle new features as they appear (ex., IPs in fraud detection)Provide transformers, such as the HashingEncoder, that apply the hashing trick.Encode arbitrary (possibly categorical data) without knowing cardinality ahead of time by using a high-dimensional sparse mapping.2525API GoalsProvide modern, regret-minimization-based online algorithms.Online Logistic RegressionAdagradOnline gradient descentL2 regularizationInput streams of any kind accepted.Streaming aware feature engineering2626Whats next?Spark Summit Brussels27 October 201627Whats next?More bells and whistlesAdaptive normalizationL1 regularizationMore algorithmsOnline quantile estimation?More general Sketches?Online clustering?Scale testing and benchmarking28Vlad Feinberg () - Adagrad was complete, right?DemoSpark Summit Brussels27 October 201629