max `(pickup_latitude) over w as vendor_max_pl,`
min `(pickup_latitude) over w as vendor_min_pl,`
avg `(pickup_latitude) over w as vendor_avg_pl,`
sum `(pickup_latitude) over w2 as pc_sum_pl,`
max `(pickup_latitude) over w2 as pc_max_pl,`
min `(pickup_latitude) over w2 as pc_min_pl,`
avg `(pickup_latitude) over w2 as pc_avg_pl ,`
count `(vendor_id) over w2 as pc_cnt,`
count `(vendor_id) over w as vendor_cnt`
from {}
window w as (partition by vendor_id order by pickup_datetime ROWS_RANGE BETWEEN 1d PRECEDING AND CURRENT ROW),
w2 as (partition by passenger_count order by pickup_datetime ROWS_RANGE BETWEEN 1d PRECEDING AND CURRENT ROW) `
我们选择了vendor_id 和 passenger_count 两个纬度做时序特征
train_df = spark.sql(train_sql)
# specify your configurations as a dict
params = {
'boosting_type' `: 'gbdt' ,
'objective' `: 'regression' ,
'metric' `: { 'l2' , 'l1' },
'num_leaves' `: 31 ,
'learning_rate' `: 0.05 ,
'feature_fraction' `: 0.9 ,
'bagging_fraction' `: 0.8 ,
'bagging_freq' `: 5 ,
'verbose' `: 0`
}
print `( 'Starting training...' )`
gbm = lgb.train(params,
lgb_train,
num_boost_round `= 20 ,`
valid_sets `= lgb_eval,
early_stopping_rounds `= 5 )`
gbm.save_model( `'model.txt' )执行模型训练过程,最终产生model.txt
模型推理过程【半小时,将你的Spark SQL模型变为在线服务】导入数据代码
import模型推理逻辑
def insert_row(line):
row = line.split( `',' )
row[ `2 ]` `=` `'%dl' % int (datetime.datetime.strptime(row[ 2 ], '%Y-%m-%d %H:%M:%S' ).timestamp()` `*` `1000 )`
row[ `3 ]` `=` `'%dl' % int (datetime.datetime.strptime(row[ 3 ], '%Y-%m-%d %H:%M:%S' ).timestamp()` `*` `1000 )`
insert = "insert into t1 values('%s', %s, %s, %s, %s, %s, %s, %s, %s, '%s', %s);" `% tuple (row)
driver.executeInsert( `'db_test' , insert)
with open `( 'data/taxi_tour_table_train_simple.csv' , 'r' ) as fd:
idx = 0
for line in fd:
if idx = `= 0 :
idx = idx + 1
continue
insert_row(line.replace( `'n' , ''))
idx = idx + 1 `
注:train.csv为训练数据csv格式版本
predict.py最终执行效果# 发送推理请求 ,会看到如下输出
def` `post( self ):
row = json.loads( `self .request.body)
ok, req = fedb_driver.getRequestBuilder( `'db_test' , sql)
if not ok or not req:
self `.write( "fail to get req" )`
return
input_schema = req.GetSchema()
if not input_schema:
self `.write( "no schema found" )`
return
str_length = 0
for i in range `(input_schema.GetColumnCnt()):`
if sql_router_sdk.DataTypeName(input_schema.GetColumnType(i)) = `= 'string' :
str_length = str_length + len `(row.get(input_schema.GetColumnName(i), ''))`
req.Init(str_length)
for i in range `(input_schema.GetColumnCnt()):`
tname = sql_router_sdk.DataTypeName(input_schema.GetColumnType(i))
if tname = `= 'string' :
req.AppendString(row.get(input_schema.GetColumnName(i), ''))
elif tname = `= 'int32' :
req.AppendInt32( `int (row.get(input_schema.GetColumnName(i),` `0 )))`
elif tname = `= 'double' :
req.AppendDouble( `float (row.get(input_schema.GetColumnName(i),` `0 )))`
elif tname = `= 'timestamp' :
req.AppendTimestamp( `int (row.get(input_schema.GetColumnName(i),` `0 )))`
else `:`
req.AppendNULL()
if not req.Build():
self `.write( "fail to build request" )`
return
ok, rs = fedb_driver.executeQuery( `'db_test' , sql, req)
if not ok:
self `.write( "fail to execute sql" )`
return
rs. `Next ()
ins = build_feature(rs)
self `.write( "----------------ins---------------n" )`
self `.write( str (ins) + "n" )
duration = bst.predict(ins)
self `.write( "---------------predict trip_duration -------------n" )`
self `.write( "%s s" % str (duration[ 0 ]))``
Python3 predict.py运行demo请到
----------------ins---------------
[[ 2. 40.774097 40.774097 40.774097 40.774097 40.774097 40.774097
40.774097 40.774097 1. 1. ]]
---------------predict trip_duration -------------
859.3298781277192 s `
推荐阅读
- 高考将至 有利孩子考高分的风水“小贴士”
- 大寒 , 年关将至 好茶把温暖带回家
- 霜降将至 冬天还会远吗
- 5个规则,确保你的微服务优化运行
- 心理测试 你的择偶标准是什么
- 充电器|逐渐“果化”?马斯克证实特斯拉将不再赠送移动充电器
- 古树滇红茶如何泡,福元昌2014年古树滇红100克即将上市
- 骁龙888|高通骁龙888+80W+OIS防抖全部下放!OPPO K10 Pro即将登场
- iPhone|传iPhone 14推出后iPhone 11将停产:一代LCD神机要拜拜了
- 天玑8100|不止首发天玑8100-MAX!一加Ace将搭载一代“神底”IMX766
