--- /dev/null
+
+/* -}{----------------------------------------------------------------------- */
+
+#include <kernelapi.h>
+#include <ni.h>
+
+/* -}{----------------------------------------------------------------------- */
+
+#define TMPBUFSIZE 4096
+static char tmpbuf[TMPBUFSIZE];
+static k_hashtable* own_resources;
+
+/* -}{----------------------------------------------------------------------- */
+
+extern void run_tests(void);
+
+extern char* make_cache_path(char* uri);
+extern void look_in_file_cache(ni_event* evq);
+extern void save_in_file_cache(ni_resource* res);
+
+extern void init_uri2chan(void);
+extern char* get_host_for(char* uri);
+extern char* get_channel_for(char* host);
+extern char* use_ping_info(k_hashtable*, k_channel*);
+extern void use_from_info(k_hashtable*, k_channel*);
+extern void ping_tunnels(void);
+extern void send_ping(k_channel* chan, char* firstline, char* to);
+
+/* -}{----------------------------------------------------------------------- */
+
+static int handles_resource(char* name);
+static void sync_resource(ni_resource* res);
+ int connection_writable(k_channel* chan, int bufpos, int len);
+ int connection_readable(k_channel* chan, int bufpos, int len);
+static int recv_next_event( k_channel* chan);
+static void recv_request( k_channel* chan, char* header);
+static void recv_response( k_channel* chan, char* header);
+static void got_mmap(char*, char*, char*, int, k_stat, void*);
+static void set_read_buffer(k_channel*, char*, size_t, ni_event*);
+static int recv_entity( k_channel* chan, int bufpos, int eof);
+static int expecting_response(char* pub, ni_event* evt, k_channel*);
+static void do_request( ni_event* evq);
+ void ensure_self_sub(ni_event* evq);
+static void ping_resource_subs(void* arg, char* key, void* val);
+static void ping_sub(ni_resource* res, k_hashtable* sub);
+static ni_resource* own_resource(char* uri);
+static void send_request(ni_event* evq);
+static void send_response(ni_event* evt);
+static k_channel* ensure_chan(char* chanm);
+
+/* -}{----------------------------------------------------------------------- */
+
+EXPORT int np_module_loaded(void)
+{
+ ni_register_driver("np", handles_resource, sync_resource);
+
+ if(strstr(k_version, "test")){
+ //run_tests();
+ }
+
+ init_uri2chan();
+
+ own_resources=k_hashtable_new("Own Resources", 0);
+
+ k_log_out("NP Driver initialised");
+
+ return 0;
+}
+
+EXPORT void np_module_tick(void)
+{
+ static long tix;
+ tix++;
+ if(!(tix % 1000)){
+ k_hashtable_apply(own_resources, ping_resource_subs, 0);
+ }
+ if(!(tix % 3000)){
+ ping_tunnels();
+ }
+}
+
+EXPORT int np_module_event(void* data)
+{
+ ni_event* evt=data;
+ if(!k_hashtable_get(evt->ent_head, "Status:")){
+ do_request(evt);
+ }
+ else{
+ send_response(evt);
+ }
+ return 0;
+}
+
+/* -}{----------------------------------------------------------------------- */
+
+int handles_resource(char* name)
+{
+ return 0;
+}
+
+void sync_resource(ni_resource* res)
+{
+ save_in_file_cache(res);
+}
+
+/* -}{----------------------------------------------------------------------- */
+
+int connection_readable(k_channel* chan, int bufpos, int len)
+{
+ if(0) k_log_out("connection_readable %s %p %d %d %p",
+ chan->name, chan, bufpos, len, chan->context);
+ int sof=(len== 0);
+ int eof=(len== -1);
+
+ if(sof) return 0;
+
+ do{
+ ni_event* evt=chan->context;
+ if(!evt){
+ int n=recv_next_event(chan);
+ if(n<0) break;
+ bufpos-=n;
+ }
+ else{
+ int n=recv_entity(chan, bufpos, eof);
+ if(n<0) break;
+ bufpos-=n;
+ }
+
+ } while(1);
+
+ if(eof && chan->context){
+ ni_event* evt=chan->context;
+ evt->entity=0;
+ ni_event_delete(evt);
+ chan->context=0;
+ }
+
+ return 0;
+}
+
+int connection_writable(k_channel* chan, int bufpos, int len)
+{
+ if(0) k_log_out("connection_writable %p %d %d %p",
+ chan, bufpos, len, chan->context);
+//if(len>20000) exit(1);
+ int sof=(len== 0);
+ int eof=(len== -1);
+
+ if(sof){
+ send_ping(chan, "PING ni/0.5" CRLF, 0);
+ return 0;
+ }
+
+ if(eof && chan->context){
+ ni_event* evt=chan->context;
+ evt->entity=0;
+ ni_event_delete(evt);
+ chan->context=0;
+ }
+
+ return 0;
+}
+
+/* -}{---- Receiving -------------------------------------------------------- */
+
+int recv_next_event(k_channel* chan)
+{
+ char* header=k_channel_chop_div(chan, CRLF CRLF);
+ if(!header) return -1;
+ int n=strlen(header)+strlen(CRLF CRLF);
+
+ if(!strncmp(header, "GET", 3) ||
+ !strncmp(header, "SUB", 3) ||
+ !strncmp(header, "UNSUB",5) ||
+ !strncmp(header, "HEAD", 4) ||
+ !strncmp(header, "PING", 4) ){
+
+ recv_request(chan, header);
+ return n;
+ }
+ if(!strncmp(header, "ni/", 4) ){
+
+ recv_response(chan, header);
+ return n;
+ }
+ k_free(header);
+ k_log_err("Failed reading request or response - closing connection");
+ k_channel_close(chan);
+ return n;
+}
+
+void recv_request(k_channel* chan, char* header)
+{
+ ni_event* evq;
+ evq=ni_get_request_headers(header);
+ if(!evq){
+ k_log_err("Failed reading request headers - closing connection");
+ k_channel_close(chan);
+ return;
+ }
+ if(!k_hashtable_isn(evq->evt_head, "Protocol:", "ni/", 4)){
+ ni_event_delete(evq);
+ k_log_err("Failed reading request not ni - closing connection");
+ k_channel_close(chan);
+ return;
+ }
+
+ k_hashtable* ent_head=evq->ent_head;
+ int ping=k_hashtable_is(ent_head, "Method:", "PING");
+
+ if(!evq->uri && !ping){
+ evq->uri=k_strdup(chan->name);
+ k_hashtable_put_dup(ent_head, "URI:", chan->name);
+ }
+
+ ni_event_show(evq, "ni Protocol Request");
+
+ if(k_hashtable_isi(evq->evt_head, "Connection:", "Keep-Alive")){
+ chan->linger=1;
+ if(k_hashtable_isn(ent_head, "Sub-To:", "./test", 6)){
+ chan->linger=0;
+ }
+ }
+ if(ping){
+ char* from=use_ping_info(ent_head, chan);
+ if(from) send_ping(chan, "ni/0.5 270 PING" CRLF, from);
+ ni_event_delete(evq);
+ return;
+ }
+ //use_from_info(ent_head, chan);
+
+ ni_event* evp=ni_event_new(evq->uri, 0, k_hashtable_dup(ent_head), 0);
+
+ ni_event_delete(evq);
+
+ k_event_post("ni", evp);
+}
+
+void recv_response(k_channel* chan, char* header)
+{
+ ni_event* evt=ni_get_response_headers(header);
+ if(!evt){
+ k_log_err("recv_response: headers failed but doing nothing!");
+ return;
+ }
+ char* pub= evt->uri;
+ k_hashtable* ent_head=evt->ent_head;
+
+ if(!expecting_response(pub, evt, chan)) return;
+
+ ni_event_show(evt, "Response");
+
+ int head=k_hashtable_is( ent_head, "Status:", "260");
+ int nmod=k_hashtable_is( ent_head, "Status:", "304");
+ int ping=k_hashtable_is( ent_head, "Status:", "270");
+ int cl =k_hashtable_get_int(ent_head, "Content-Length:");
+ int entity=!(head || nmod || ping || cl==0);
+
+ if(ping){
+ use_ping_info(ent_head, chan);
+ ni_event_delete(evt);
+ return;
+ }
+ use_from_info(ent_head, chan);
+
+ if(entity){
+ k_hashtable_set(ent_head, "Status:", "260");
+ k_hashtable_set(ent_head, "Status-Text:", "Headers Only");
+ }
+ k_event_post("ni", evt);
+
+ if(entity){
+
+ k_hashtable* eh=k_hashtable_new("nHeaders/recv_response", 1);
+ char* from =k_hashtable_get(ent_head, "From:");
+ char* contlen =k_hashtable_get(ent_head, "Content-Length:");
+ char* cux =k_hashtable_get(ent_head, "CUX:");
+ k_hashtable_set(eh, "Status:", "206");
+ k_hashtable_set(eh, "Status-Text:", "Partial Content");
+ k_hashtable_put_dup(eh, "From:", from);
+ k_hashtable_put_dup(eh, "Content-Length:", contlen);
+ k_hashtable_put_dup(eh, "CUX:", cux);
+ ni_event* evc=ni_event_new(pub, 0, eh, 0);
+ chan->context=evc;
+
+ int constant=k_hashtable_is(ent_head, "CUX:", "C");
+ if(constant){
+ char* path=make_cache_path(pub); if(!path) return;
+ k_file_read(".", path, USE_MMAP, cl, got_mmap, chan);
+ }
+ else{
+ char* data=k_malloc(cl);
+ set_read_buffer(chan, data, cl, evc);
+ }
+ }
+}
+
+void got_mmap(char* basedir,
+ char* path,
+ char* data,
+ int usedmmap,
+ k_stat kstat,
+ void* context){
+
+ k_free(path);
+ k_channel* chan=context;
+ ni_event* evt=chan->context;
+ if(!evt){ k_log_err("got_mmap: evt=0"); return; }
+ if(!data || !usedmmap){ k_log_err("got_mmap: mmap failed"); return; }
+
+ size_t cl=k_hashtable_get_int(evt->ent_head, "Content-Length:");
+ set_read_buffer(chan, data, cl, evt);
+}
+
+void set_read_buffer(k_channel* chan, char* data, size_t cl, ni_event* evt)
+{
+ evt->entity=data;
+ int r=k_channel_setbuf(chan, data, cl);
+ if(0) k_log_out("k_channel_setbuf %d", r);
+ if(r==BUFFER_ALREADY_SET){
+ k_log_err("oops! k_channel_setbuf BUFFER_ALREADY_SET");
+ return;
+ }
+ if(r==BUFFER_FILLED){
+ k_hashtable_set(evt->ent_head, "Status:", "200");
+ k_hashtable_set(evt->ent_head, "Status-Text:", "OK");
+ chan->context=0;
+ k_event_post("ni", evt);
+ }
+}
+
+int recv_entity(k_channel* chan, int bufpos, int eof)
+{
+ ni_event* evt=chan->context;
+ k_hashtable* ent_head=evt->ent_head;
+
+ char* cls=k_hashtable_get( ent_head, "Content-Length:");
+ int cl =k_hashtable_get_int(ent_head, "Content-Length:");
+
+ if(!cls && !eof) return -1;
+
+ int partial=0;
+ int eofcontlen=eof && (!cls || bufpos < cl);
+ if(eofcontlen){
+ if(cls){
+ char* clg=k_strdup(cls);
+ k_hashtable_put(ent_head, "Content-Length-Given:", clg);
+ partial=1;
+ }
+ cl=bufpos;
+ char b[32]; snprintf(b, 32, "%d", cl);
+ k_hashtable_put_dup(ent_head, "Content-Length:", b);
+ }
+
+ if(bufpos < cl){
+ if(bufpos){
+ ni_event* evp=ni_event_dup(evt);
+ snprintf(tmpbuf, TMPBUFSIZE, "0-%d", bufpos);
+ char* cr=k_strdup(tmpbuf);
+ k_hashtable_put(evp->ent_head, "Content-Range:", cr);
+ k_event_post("ni", evp);
+ }
+ return -1;
+ }
+
+ static char dummy_empty_entity[0];
+ if(!k_channel_getbuf(chan)){
+ int cn=k_hashtable_is(ent_head, "CUX:", "C");
+ if(cl) evt->entity=k_channel_chop_len(chan, cl);
+ else evt->entity=cn? dummy_empty_entity: k_malloc(1);
+ }
+
+ if(!partial){
+ k_hashtable_set(ent_head, "Status:", "200");
+ k_hashtable_set(ent_head, "Status-Text:", "OK");
+ }
+ chan->context=0;
+ k_event_post("ni", evt);
+
+ return cl;
+}
+
+int expecting_response(char* pub, ni_event* evt, k_channel* chan)
+{
+ if(pub && 0){
+ k_log_err("unwanted response: %s", pub);
+ ni_event_delete(evt);
+ k_channel_close(chan);
+ return 0;
+ }
+ return 1;
+}
+
+/* -}{---- Sending ---------------------------------------------------------- */
+
+void do_request(ni_event* evq)
+{
+ k_hashtable* sub=evq->ent_head;
+ int tc=k_hashtable_isi(sub, "Sub-Type:", "Cache");
+ int to=k_hashtable_isi(sub, "Sub-Type:", "Original");
+
+ if(tc){
+ char* ims=k_hashtable_get(sub, "If-Modified-Since:");
+ if(ims) ensure_self_sub(evq);
+ else look_in_file_cache(evq);
+ }
+ else
+ if(to){
+ send_request(evq);
+ }
+}
+
+void ensure_self_sub(ni_event* evq)
+{
+ k_hashtable* sub=evq->ent_head;
+ char* pub=k_hashtable_get(sub, "Sub-To:");
+
+ ni_resource* res=own_resource(pub);
+ k_hashtable* enh=res->ent_head;
+ k_hashtable* selfsub=k_hashtable_get(enh, "Sub-To:");
+ if(selfsub && !k_hashtable_is(selfsub, "Status-Cache:", "OK")){
+ k_log_err("cancel selfsub as new one needed");
+ }
+
+ k_hashtable* ss=k_hashtable_dup(sub);
+ k_hashtable_remove( ss, "From:");
+ k_hashtable_put_dup(ss, "URI:", pub);
+ k_hashtable_set( ss, "Sub-Type:", "Original");
+ k_hashtable_put_dup(ss, "Via:", get_host_for(pub));
+ if(k_hashtable_get( ss, "If-Modified-Since:")){
+ char* lm=k_hashtable_get(enh, "Last-Modified:");
+ if(!res->entity) lm=0;
+ k_hashtable_set(ss, "If-Modified-Since:", lm? lm: "0");
+ }
+ ni_event* evs=ni_event_new(0, 0, ss, 0);
+ k_event_post("ni", evs);
+
+ ni_event_delete(evq);
+}
+
+void ping_resource_subs(void* arg, char* key, void* val)
+{
+ ni_resource* res=val;
+ k_hashtable* pubcache=k_hashtable_get(res->ent_head, "Pub-Cache:");
+ if(!pubcache || !k_hashtable_get(pubcache, "Method:")) return;
+ k_hashtable* subs=k_hashtable_get(res->ent_head, "Sub-To:");
+ k_hashtable* sub;
+ for(sub=subs; sub; sub=sub->next){
+ if(!k_hashtable_is(sub, "Status-Cache:", "OK")){
+ if(!k_hashtable_get(sub, "Status:")){
+ ping_sub(res, sub);
+ }
+ else{
+ int ts=k_hashtable_get_int(sub, "Timestamp:");
+ if(0) k_log_out("check dried-up request: %d", ts);
+ }
+ }
+ }
+}
+
+void ping_sub(ni_resource* res, k_hashtable* sub)
+{
+ ni_resource_show(res, "ping_resource_subs");
+
+ k_hashtable* ss=k_hashtable_dup(sub);
+
+ char* subto=k_hashtable_extract(ss, "URI:");
+ k_hashtable_put_dup(ss, "URI:", res->uri);
+ k_hashtable_put( ss, "Sub-To:", subto);
+ k_hashtable_set( ss, "Sub-Type:", "Original");
+ k_hashtable_put_dup(ss, "Via:", get_host_for(res->uri));
+
+ ni_event* evs=ni_event_new(0, 0, ss, 0);
+ k_event_post("ni", evs);
+}
+
+ni_resource* own_resource(char* uri)
+{
+ ni_resource* res=k_hashtable_get(own_resources, uri);
+ if(!res){
+ res=ni_resource_get(uri);
+ k_hashtable_set(own_resources, uri, res);
+ }
+ return res;
+}
+
+void send_request(ni_event* evt)
+{
+ k_hashtable* eh=evt->ent_head;
+ char* method=k_strdup(k_hashtable_get(eh, "Method:"));
+ char* to =k_strdup(k_hashtable_get(eh, "Sub-To:"));
+ char* via =k_strdup(k_hashtable_get(eh, "Via:"));
+
+ char* chanm=get_channel_for(via);
+ if(!chanm) goto free_and_return;
+
+ k_channel* chan=ensure_chan(chanm);
+ if(!chan) goto free_and_return;
+
+ ni_fix_ni_headers(eh, 0);
+ ni_request(evt, to, method, chan);
+
+ free_and_return:
+ k_free(method); k_free(to); k_free(via);
+ ni_event_delete(evt);
+}
+
+void send_response(ni_event* evt)
+{
+ ni_event_show(evt, "send_response");
+
+ k_hashtable* eh=evt->ent_head;
+
+ k_hashtable* sub=k_hashtable_get(eh, "Pub-To:");
+ char* uri =k_hashtable_get(sub, "URI:");
+ char* from =k_hashtable_get(sub, "From:");
+ char* method =k_hashtable_get(sub, "Method:");
+ int methead =k_hashtable_is( sub, "Method:", "HEAD");
+
+ char* to=from? from: uri;
+
+ char* host=from? from: get_host_for(uri);
+ char* chanm=get_channel_for(host);
+ if(!chanm){
+ if(0) k_log_out("no ni protocol channel %s", to);
+ ni_event_delete(evt);
+ return;
+ }
+
+ k_channel* chan=ensure_chan(chanm);
+ if(!chan){
+ if(0) k_log_out("no ni protocol channel %s", to);
+ ni_event_delete(evt);
+ return;
+ }
+
+ k_hashtable_extract(eh, "Pub-To:");
+
+ char* protocol="ni/0.5";
+
+ ni_fix_ni_headers(eh, methead);
+ ni_response(evt, to, method, protocol, 0, chan);
+
+ k_hashtable_delete(sub);
+ evt->entity=0;
+ ni_event_delete(evt);
+}
+
+k_channel* ensure_chan(char* chanm)
+{
+ k_channel* chan=k_channel_get_name(chanm);
+ if(!chan){
+ k_log_err("Cannot find current channel for %s", chanm);
+ k_channel_connect_name(chanm, connection_readable,
+ connection_writable);
+ }
+ return chan;
+}
+
+/* -}{----------------------------------------------------------------------- */
+