Commit 47e430f0 authored by guo's avatar guo
Browse files

fix:factory

Showing with 321 additions and 94 deletions
+321 -94
......@@ -23,6 +23,7 @@
-export([
start_time/0,
get_iso_8601/1,
get_iso_8601_loacl/0,
timezone/0,
nowstamp/0,
local_time/0,
......@@ -60,6 +61,24 @@
-import(dgiot_utils, [to_int/1, to_list/1, to_binary/1, tokens/2]).
-define(TIMEZONE, + 8).
get_iso_8601_loacl() ->
{{Y1, M1, D1}, {H1, Mm1, S1}} = calendar:local_time(),
F =
fun(Num) ->
NumL = to_list(Num),
case length(NumL) of
1 ->
"0" ++ NumL;
_ ->
NumL
end
end,
[Y, M, D, H, Mn, S] = [F(Num) || Num <- [Y1, M1, D1, H1, Mm1, S1]],
lists:concat([Y, "-", M, "-", D, "T", H, ":", Mn, ":", S]).
get_iso_8601(Expire_syncpoint) ->
utc(Expire_syncpoint).
......@@ -324,7 +343,7 @@ last_month(Count) ->
last_month(StartTime, EndTime, Count - 1).
last_month(StartTime, EndTime, 0) ->
{StartTime*1000, EndTime*1000};
{StartTime * 1000, EndTime * 1000};
last_month(StartTime, EndTime, Count) ->
{{Year, Month, _Day}, {_Hour, _Minute, _Second}} = dgiot_datetime:unixtime_to_localtime(StartTime),
......
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 DGIOT Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(dgiot_tcp2dlink_channel).
-behavior(dgiot_channelx).
-author("kenneth").
-include_lib("dgiot_bridge/include/dgiot_bridge.hrl").
-include_lib("dgiot/include/dgiot_socket.hrl").
-include_lib("dgiot/include/logger.hrl").
-define(TYPE, <<"TCP2DLINK">>).
-define(MAX_BUFF_SIZE, 1024).
-record(state, {
id,
productId
,
is_sub
}).
%% API
-export([start/2]).
%% Channel callback
-export([init/3, handle_init/1, handle_event/3, handle_message/2, stop/3]).
%% 注册通道类型
-channel_type(#{
cType => ?TYPE,
type => ?PROTOCOL_CHL,
title => #{
zh => <<"TCP2DLINK"/utf8>>
},
description => #{
zh => <<"TCP2DLINK"/utf8>>
}
}).
%% 注册通道参数
-params(#{
<<"port">> => #{
order => 1,
type => integer,
required => true,
default => 18110,
title => #{
zh => <<"端口"/utf8>>
},
description => #{
zh => <<"侦听端口"/utf8>>
}
},
<<"ico">> => #{
order => 102,
type => string,
required => false,
default => <<"/dgiot_file/shuwa_tech/zh/product/dgiot/channel/TcpIcon.jpeg">>,
title => #{
en => <<"channel ICO">>,
zh => <<"通道ICO"/utf8>>
},
description => #{
en => <<"channel ICO">>,
zh => <<"通道ICO"/utf8>>
}
}
}).
start(ChannelId, ChannelArgs) ->
dgiot_channelx:add(?TYPE, ChannelId, ?MODULE, ChannelArgs).
%% 通道初始化
init(?TYPE, ChannelId, #{
<<"port">> := Port
}) ->
State = #state{
id = ChannelId
,
is_sub = 0
},
{ok, State, dgiot_tcp2dlink_worker:child_spec(Port, State)}.
handle_init(State) ->
{ok, State}.
%% 通道消息处理,注意:进程池调用
handle_event(_EventId, Event, State) ->
?LOG(info, "Channel ~p", [Event]),
{ok, State}.
handle_message(Message, State) ->
?LOG(info, "Channel ~p", [Message]),
{ok, State}.
stop(ChannelType, ChannelId, _State) ->
?LOG(warning, "Channel[~p,~p] stop", [ChannelType, ChannelId]),
ok.
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 DGIOT Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(dgiot_tcp2dlink_worker).
-author("kenneth").
-include_lib("dgiot_bridge/include/dgiot_bridge.hrl").
-include_lib("dgiot/include/dgiot_socket.hrl").
-include_lib("dgiot/include/logger.hrl").
-define(TYPE, <<"TCP2DLINK">>).
-define(MAX_BUFF_SIZE, 1024).
-record(state, {
id,
productId
,
is_sub
}).
%% TCP callback
-export([child_spec/2, init/1, handle_info/2, handle_cast/2, handle_call/3, terminate/2, code_change/3]).
child_spec(Port, State) ->
dgiot_tcp_server:child_spec(?MODULE, Port, State).
%% =======================
%% {ok, State} | {stop, Reason}
init(#tcp{state = #state{id = ChannelId} = State} = TCPState) ->
case dgiot_bridge:get_products(ChannelId) of
{ok, ?TYPE, ProductId} ->
%% do_cmd(ProductId, connection_ready, <<>>, TCPState),
NewState = State#state{productId = ProductId},
{ok, TCPState#tcp{state = NewState}};
{error, not_find} ->
{error, not_find_channel}
end;
init(#tcp{state = State} = _TCPState) ->
io:format("~s ~p State = ~p ~n", [?FILE, ?LINE, State]),
{state, ID} = State,
io:format("~s ~p ID = ~p ~n", [?FILE, ?LINE, ID]),
ok.
%% task2device 下行
handle_info({deliver, _, Msg}, TCPState) ->
io:format("~s ~p here ~n", [?FILE, ?LINE]),
Payload = dgiot_mqtt:get_payload(Msg),
Topic = dgiot_mqtt:get_topic(Msg),
case binary:split(Topic, <<$/>>, [global, trim]) of
[<<"thing">>, ProductId, DevAddr, <<"tcp">>, <<"hex">>] ->
DeviceId = dgiot_parse_id:get_deviceid(ProductId, DevAddr),
dgiot_device:save_log(DeviceId, Payload, ['tcp_send']),
dgiot_tcp_server:send(TCPState, dgiot_utils:hex_to_binary(dgiot_utils:trim_string(Payload))),
{noreply, TCPState};
_ ->
case jsx:is_json(Payload) of
true ->
case jsx:decode(Payload, [{labels, binary}, return_maps]) of
#{<<"cmd">> := <<"send">>} = Cmd ->
handle_info(Cmd, TCPState);
Info ->
handle_info(Info, TCPState)
end;
false ->
{noreply, TCPState}
end
end;
% device2task 上行
handle_info({tcp, Buff}, #tcp{state = #state{id = ChannelId, productId = ProductIds,is_sub = Is_sub} = State} = TCPState) ->
dgiot_bridge:send_log(ChannelId, "Payload ~s", [dgiot_utils:to_list(Buff)]),
case jsx:is_json(Buff) of
true ->
Map = jsx:decode(Buff, [{labels, binary}, return_maps]),
ProductId = lists:nth(1, ProductIds),
dgiot_bridge:send_log(ChannelId, ProductId, " Recv ~s", [jsx:encode(Map)]),
case maps:find(<<"devaddr">>, Map) of
{ok, DevAddr} ->
NewState = case Is_sub of
0 ->
dgiot_mqtt:subscribe(<<"$dg/device/", ProductId/binary, "/", DevAddr/binary, "/#">>),
State#state{is_sub = 1} ;
_ ->
State
end,
Payload = maps:without([<<"devaddr">>], Map),
dgiot_dlink_proctol:properties_report(ProductId, DevAddr, Payload),
dgiot_bridge:send_log(ChannelId, ProductId, "Device:~p Save_td ~s", [DevAddr,jsx:encode(Payload)]),
{noreply, TCPState#tcp{state = NewState}};
_ ->
pass
end;
_ ->
{noreply, TCPState}
end;
%% {stop, TCPState} | {stop, Reason} | {ok, TCPState} | ok | stop
handle_info(_Info, TCPState) ->
{noreply, TCPState}.
handle_call(_Msg, _From, TCPState) ->
{reply, ok, TCPState}.
handle_cast(_Msg, TCPState) ->
{noreply, TCPState}.
terminate(_Reason, #tcp{state = #state{productId = _ProductId}} = _TCPState) ->
ok;
terminate(_Reason, _TCPState) ->
ok.
code_change(_OldVsn, TCPState, _Extra) ->
{ok, TCPState}.
##--------------------------------------------------------------------
## delete field
##--------------------------------------------------------------------
parse.delete_field = ACL,objectId,updatedAt,createdAt
##--------------------------------------------------------------------
## parse config
##--------------------------------------------------------------------
parse.parse_server = http://47.118.69.187:1337
parse.parse_path = /parse/
parse.parse_appid = 19eec9ef27fb927080413a8b5706a446
parse.parse_master_key = c30e91c5956361223c70eafed8cbeddd
parse.parse_js_key = 53c10f7ea8d372e7e85f523228e00433
parse.parse_rest_key = 39280b2ebf766aa0e6b7a76dc17b93dc
##--------------------------------------------------------------------
## parse cache
##--------------------------------------------------------------------
## auto_save自动坚持保存时间 单位 毫秒
parse.cache.auto_save = 10000
## max_size 最大存储数量
parse.cache.max_size = 100000
## max_memory 最大内存数,单位byte
parse.cache.max_memory = 1024000
## max_time 最大时间间隔,单位秒
parse.cache.max_time = 60
......@@ -22,24 +22,13 @@
%% Application callbacks
-export([start/2, stop/1]).
-export([start_hook/0,stop_hook/0]).
%%--------------------------------------------------------------------
%% Application callbacks
%%--------------------------------------------------------------------
start(_StartType, _StartArgs) ->
start_hook(),
dgiot_factory_sup:start_link().
stop(_State) ->
stop_hook(),
ok.
start_hook() ->
dgiot_hook:add(one_for_one, {factory, save_data}, fun dgiot_factory_data:handle_data/1).
stop_hook() ->
dgiot_hook:remove({factory, save_data}).
......@@ -131,7 +131,6 @@ save_data(ProductId, DeviceId, Type, Payload) ->
case F of
{ok, NewPayload} ->
Id = maps:get(?SHEETID(Type), NewPayload, get_id(DevAddr, Type)),
dgiot_task:save_td_no_match(ProductId, DevAddr, NewPayload#{?SHEETID(Type) => Id}, #{});
_ ->
{error, <<"run_hook_failed">>}
......@@ -140,7 +139,6 @@ save_data(ProductId, DeviceId, Type, Payload) ->
{error, <<"run_hook_failed">>}
end;
{error, not_find} ->
io:format("~s ~p here~n",[?FILE,?LINE]),
Id = maps:get(?SHEETID(Type), Payload, get_id(DevAddr, Type)),
dgiot_task:save_td_no_match(ProductId, DevAddr, Payload#{?SHEETID(Type) => Id}, #{});
......
......@@ -26,6 +26,7 @@ get_work_sheet(ProductId, Type, Channel, DeviceId, Where, Limit, Skip, New) ->
{ok, ParseData} ->
case get_history(Channel, ProductId, DeviceId, ThingMap, Td, Limit, Skip, Type, New) of
{ok, #{<<"results">> := HistoryData}} ->
io:format("~s ~p DeviceId= ~p ~n",[?FILE,?LINE,DeviceId]),
{Total, Res} = filter_data(Limit, Skip, HistoryData),
MergeData = merge_data(ParseData, Res, DeviceId, ThingMap),
NamedData = dgiot_factory_utils:turn_name(MergeData, ThingMap),
......@@ -34,7 +35,7 @@ get_work_sheet(ProductId, Type, Channel, DeviceId, Where, Limit, Skip, New) ->
error
end;
_ ->
{ok, <<"nodata">>}
{ok, {0,[]}}
end;
_ ->
{error, not_find_thing}
......@@ -75,7 +76,6 @@ search_parse(DeviceList, Parse, Type) when is_list(DeviceList) ->
search_parse(DeviceId, undefined, Type) ->
case dgiot_parse:get_object(<<"Device">>, DeviceId) of
{ok, #{<<"content">> := #{Type := Data}}} ->
FlatternMap = dgiot_map:flatten(#{Type => Data}),
{ok, FlatternMap#{<<"objectId">> => DeviceId}};
_ ->
......@@ -87,13 +87,11 @@ search_parse(DeviceId, Parse, Type) ->
0 ->
search_parse(DeviceId, undefined, Type);
Num ->
case dgiot_parse:get_object(<<"Device">>, DeviceId) of
{ok, #{<<"content">> := Content}} ->
FlatMap = dgiot_map:flatten(Content),
MatchNum = maps:fold(
fun(K, V, Acc) ->
case maps:find(K, FlatMap) of
{ok, V} ->
Acc + 1;
......@@ -127,47 +125,31 @@ filter_where(Where, ProductId, Type) ->
_ ->
jsx:decode(Where)
end,
case dgiot_product:lookup_prod(ProductId) of
{ok, #{<<"thing">> := #{<<"properties">> := PropertiesList}}} ->
{Parse, Td} = lists:foldl(
fun(X, {Parse, Td}) ->
Identifier = maps:get(<<"identifier">>, X, <<"">>),
case lists:member(Identifier, maps:keys(MapWhere)) of
true ->
case X of
#{<<"isstorage">> := false, <<"devicetype">> := Type} ->
{Parse#{Identifier => maps:get(Identifier, MapWhere)}, Td};
#{<<"isstorage">> := true, <<"devicetype">> := Type} ->
{Parse, Td#{Identifier => maps:get(Identifier, MapWhere)}};
_ ->
{Parse, Td}
case get_ThingMap(Type, ProductId) of
{ok, ThingMap} ->
{Parse, Td} = maps:fold(
fun(K,V,{Parse, Td})->
case maps:is_key(K,ThingMap) of
true ->
{Parse, Td#{K => V}};
_ ->
{Parse#{K => V}, Td}
end;
false ->
{Parse, Td}
end
end, {#{}, #{}}, PropertiesList),
TdWithPerson = case lists:member(<<"person">>, maps:keys(MapWhere)) of
end,{#{},#{}},MapWhere),
TdWithPerson = case maps:is_key(<<"person">>,ThingMap) of
true ->
Td#{<<"person">> => maps:get(<<"person">>, MapWhere)};
false ->
Td
end,
case get_ThingMap(Type, ProductId) of
{ok, ThingMap} ->
{Parse, TdWithPerson, ThingMap};
_ ->
error
end;
{Parse, TdWithPerson, ThingMap};
_ ->
error
end.
get_ThingMap(Type, ProductId) ->
case dgiot_parse:get_object(<<"Product">>, ProductId) of
{ok, #{<<"thing">> := #{<<"properties">> := Properties}}} ->
......@@ -203,6 +185,7 @@ get_history(Channel, ProductId, DeviceId, ThingMap, Where, _Limit, _Skip, Type,
{error, wrong_td_platform};
_ ->
DB = dgiot_tdengine_select:format_db(?Database(ProductId)),
TableName = case is_list(DeviceId) of
true ->
?Table(ProductId);
......@@ -317,7 +300,7 @@ get_from(<<"true">>, DB, TableName, Type, List) ->
Select = dgiot_utils:to_binary(lists:nthtail(2, Last)),
SheetId = ?SHEETID(Type),
<<"(select ", Select/binary, " from ", DB/binary, TableName/binary, " group by ", SheetId/binary, " ) ">>;
<<"( select ", Select/binary, " from ", DB/binary, TableName/binary, " group by ", SheetId/binary, " ) ">>;
get_from(_, DB, TableName, _, _) ->
<<DB/binary, TableName/binary>>.
......
......@@ -151,7 +151,7 @@ do_request(get_data, #{<<"productId">> := undefined, <<"objectId">> := undefined
{error, <<"get_data_failed">>}
end;
_ ->
io:format("~s ~p here~n", [?FILE, ?LINE]),
{error, <<"notfinddevice">>}
end
......@@ -171,7 +171,6 @@ do_request(get_data, #{<<"productId">> := ProductId, <<"objectId">> := undefined
{error, <<"get_data_failed">>}
end;
_ ->
io:format("~s ~p here~n", [?FILE, ?LINE]),
{error, <<"notfinddevice">>}
end
......
......@@ -19,35 +19,60 @@
-include_lib("dgiot/include/logger.hrl").
-export([store_all/5]).
-export([get_num/2, get_name/2, turn_name/2, turn_num/3]).
store_all(_, _, DeviceId, _, <<"false">>) ->
dgiot_parse:update_object(<<"Device">>, DeviceId, #{<<"realstatus">> => 2});
store_all(ProductId, Channel, DeviceId, Operator, <<"true">>) ->
case dgiot_factory_getdata:get_work_sheet(ProductId, <<"product">>, Channel, DeviceId, undefined, undefined, 0, <<"true">>) of
{ok, {_, Res}} ->
lists:foldl(
fun(X, _) ->
case maps:get(<<"product_condition">>, X) of
2 ->
dgiot_factory_channel:save_data(ProductId, DeviceId, <<"product">>, X#{<<"product_condition">> := 3, <<"product_storedperson">> => Operator});
_ ->
pass
end
dgiot_factory_channel:save_data(ProductId, DeviceId, <<"product">>, X#{<<"product_condition">> := 3, <<"product_storedperson">> => Operator})
end, [], Res),
case dgiot_parse:get_object(<<"Device">>, DeviceId) of
{ok, #{<<"detail">> := Detail}} ->
EndTime = dgiot_datetime:format("YYYY-MM-DD HH:NN:SS"),
dgiot_parse:update_object(<<"Device">>, DeviceId, #{<<"realstatus">> => 8, <<"detail">> => Detail#{<<"taskend">> => EndTime}});
_ ->
error
end;
handle_dingdan(DeviceId, ProductId, Channel);
{ok, _} ->
io:format("~s ~p DeviceId= ~p ~n", [?FILE, ?LINE, DeviceId]),
handle_dingdan(DeviceId, ProductId, Channel);
_ ->
error
end;
store_all(_,_, _, _, _) ->
store_all(_, _, _, _, _) ->
error.
handle_dingdan(DeviceId, ProductId, Channel) ->
case dgiot_parse:get_object(<<"Device">>, DeviceId) of
{ok, #{<<"detail">> := Detail}} ->
EndTime = dgiot_datetime:format("YYYY-MM-DD HH:NN:SS"),
dgiot_parse:update_object(<<"Device">>, DeviceId, #{<<"realstatus">> => 8, <<"detail">> => Detail#{<<"taskend">> => EndTime}}),
Res = calculate_jindiedata(ProductId, DeviceId, Channel),
dgiot_jienuo_meter:test(Res, DeviceId);
_ ->
error
end.
calculate_jindiedata(ProductId, DeviceId, Channel) ->
case dgiot_factory_getdata:get_work_sheet(ProductId, <<"product">>, Channel, DeviceId, undefined, undefined, 0, <<"true">>) of
{ok, {_, Res}} ->
Product_mhour = lists:foldl(
fun(X, Acc) ->
case maps:get(<<"product_mhour">>, X) of
{ok, WorkTime} ->
Acc + WorkTime;
_ ->
Acc
end
end, 0, Res),
#{<<"product_mhour">> => Product_mhour};
_ ->
error
end.
get_num(K, V) ->
Id = dgiot_parse_id:get_dictid(K, K, K, K),
......@@ -77,7 +102,6 @@ get_num(K, V) ->
_ ->
error
end
end.
get_name(K, Num) ->
......
......@@ -39,6 +39,8 @@ add_field(#{<<"type">> := <<"image">>}, Database, TableName, LowerIdentifier) ->
<<"ALTER TABLE ", Database/binary, TableName/binary, " ADD COLUMN ", LowerIdentifier/binary, " BIGINT;">>;
add_field(#{<<"type">> := <<"date">>}, Database, TableName, LowerIdentifier) ->
<<"ALTER TABLE ", Database/binary, TableName/binary, " ADD COLUMN ", LowerIdentifier/binary, " TIMESTAMP;">>;
add_field(#{<<"type">> := <<"long">>}, Database, TableName, LowerIdentifier) ->
<<"ALTER TABLE ", Database/binary, TableName/binary, " ADD COLUMN ", LowerIdentifier/binary, " BIGINT;">>;
add_field(#{<<"type">> := Type}, Database, TableName, LowerIdentifier) ->
<<"ALTER TABLE ", Database/binary, TableName/binary, " ADD COLUMN ", LowerIdentifier/binary, " ", Type/binary, ";">>.
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment